Spark和SparkSQL

Spark是Mapreduce的下一代的分布式计算框架。相比更早期的Mapreduce的Job和Task的两层,Spark更为灵活,其执行粒度分为Application、Job、Stage和Task四个层次。

【未完待续】

Spark Core

RDD

RDD(Resilient Distributed Dataset),即弹性数据集是Spark中的基础结构。RDD是distributive的、immutable的,可以存在在内存中,也可以被缓存。
对RDD具有转换操作和行动操作两种截然不同的操作。转换(Transform)操作始终在RDD的Context里面,但行动(Action)操作会去掉RDD的壳。例如take是行动操作,返回的是一个数组而不是RDD了,在Scala中可以看到。

1
2
3
4
5
6
7
8
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

scala> rdd1.take(1)
res0: Array[Int] = Array(10)

scala> rdd1.take(2)
res1: Array[Int] = Array(10, 4)

转换操作是Lazy的,直到遇到一个Action操作,Spark才会生成关于整条链的执行计划并执行。这些Action操作将一个Spark Application分为了多个Job。

常见RDD

RDD是一个抽象类abstract class RDD[T] extends Serializable with Logging,在Spark中有诸如ShuffledRDD、HadoopRDD等实现。每个RDD都有对应的compute方法,用来描述这个RDD的计算方法。需要注意的是,这些RDD可能被作为某些RDD计算的中间结果,例如CoGroupedRDD,对应的,例如MapPartitionsRDD也可能是经过多个RDD变换得到的,其决定权在于所使用的算子。
我们来具体查看一些RDD。

  1. ParallelCollectionRDD
    这个RDD由parallelize得到

    1
    2
    scala> val arr = sc.parallelize(0 to 1000)
    arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
  2. HadoopRDD

    1
    class HadoopRDD[K, V] extends RDD[(K, V)] with Logging
  3. FileScanRDD
    这个RDD一般从spark.read.text(...)语句中产生,所以实现在sql模块中

    1
    2
    3
    4
    5
    class FileScanRDD(
    @transient private val sparkSession: SparkSession,
    readFunction: (PartitionedFile) => Iterator[InternalRow],
    @transient val filePartitions: Seq[FilePartition])
    extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
  4. MapPartitionsRDD

    1
    class MapPartitionsRDD[U, T] extends RDD[U]

    这个RDD是mapmapPartitionsmapPartitionsWithIndex操作的结果。
    注意,在较早期的版本中,map会得到一个MappedRDDfilter会得到一个FilteredRDDflatMap会得到一个FlatMappedRDD,不过目前已经找不到了,统一变成MapPartitionsRDD

    1
    2
    3
    4
    5
    6
    scala> val a3 = arr.map(i => (i+1, i))
    a3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:25
    scala> val a3 = arr.filter(i => i > 3)
    a3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:25
    scala> val a3 = arr.flatMap(i => Array(i))
    a3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at flatMap at <console>:25

    join操作的结果也是MapPartitionsRDD,这是因为其执行过程的最后一步flatMapValues会创建一个MapPartitionsRDD

    1
    2
    3
    4
    5
    6
    7
    8
    scala> val rdd1 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:24

    scala> val rdd2 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)))
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24

    scala> val rddj = rdd1.join(rdd2)
    rddj: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:27
  5. ShuffledRDD
    ShuffledRDD用来存储所有Shuffle操作的结果,其中KV很好理解,C是Combiner Class。

    1
    class ShuffledRDD[K, V, C] extends RDD[(K, C)]

    groupByKey为例

    1
    2
    3
    4
    5
    scala> val a2 = arr.map(i => (i+1, i))
    a2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:25

    scala> a2.groupByKey
    res1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[3] at groupByKey at <console>:26

    注意,groupByKey需要K是Hashable的,否则会报错。

    1
    2
    3
    4
    5
    6
    7
    scala> val a2 = arr.map(i => (Array.fill(10)(i), i))
    a2: org.apache.spark.rdd.RDD[(Array[Int], Int)] = MapPartitionsRDD[2] at map at <console>:25

    scala> a2.groupByKey
    org.apache.spark.SparkException: HashPartitioner cannot partition array keys.
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:84)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77)
  6. CoGroupedRDD

    1
    class CoGroupedRDD[K] extends RDD[(K, Array[Iterable[_]])]

    首先,我们需要了解一下什么是cogroup操作,这个方法有多个重载版本。如下所示的版本,对thisother1other2的所有的key,生成一个RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2])),表示对于这个key,这三个RDD中所有值的集合。容易看到,这个算子能够被用来实现Join和Union(不过后者有点大材小用了)

    1
    2
    def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

    这里的Partitioner是一个abstract class,具有numPartitions: IntgetPartition(key: Any): Int两个方法。通过继承Partitioner可以自定义分区的实现方式,目前官方提供的有RangePartitionerHashPartitioner等。

  7. UnionRDD

    1
    class UnionRDD[T] extends RDD[T]

    UnionRDD一般通过union算子得到

    1
    2
    scala> val a5 = arr.union(arr2)
    a5: org.apache.spark.rdd.RDD[Int] = UnionRDD[7] at union at <console>:27
  8. CoalescedRDD

常见RDD Function

  1. PairRDDFunctions
    这个RDD被用来处理KV对,相比RDD,它提供了groupByKeyjoin等方法。以combineByKey为例,他有三个模板参数,从RDD过来的KV以及自己的C。相比reduce和fold系列的(V, V) => V,这多出来的C使combineByKey更灵活,通过combineByKey能够将V变换为C
    1
    2
    3
    4
    5
    6
    7
    8
    9
    def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)] = {
    //实现略
    }

Spark的架构概览

Spark在设计上的一个特点是它和下层的集群管理是分开的,一个Spark Application可以看做是由集群上的若干进程组成的。因此,我们需要区分Spark中的概念和下层集群中的概念,例如我们常见的Master和Worker是集群中的概念,表示节点;而Driver和Executor是Spark中的概念,表示进程。根据爆栈网,Driver可能位于某个Worker节点中,或者位于Master节点上,这取决于部署的方式

官网上给了这样一幅图,详细阐明了Spark集群下的基础架构。SparkContext是整个Application的管理核心,由Driver来负责管理。SparkContext负责管理所有的Executor,并且和下层的集群管理进行交互,以请求资源。

在Stage层次及以上接受DAGScheduler的调度,而TaskScheduler在Task层面进行调度。

在Spark on Yarn模式下,CoarseGrainedExecutorBackend和Executor一一对应,它是一个独立于Worker主进程之外的一个进程,我们可以jps查看到。而Task是作为一个Executor启动的一个线程来跑的,一个Executor中可以跑多个Task。在实现上,CoarseGrainedExecutorBackend继承了ExecutorBackend这个trait,作为一个IsolatedRpcEndpoint,维护Executor对象实例,并通过创建的DriverEndpoint实例的与Driver进行交互。在进程启动时,CoarseGrainedExecutorBackend调用onStart()方法向Driver注册自己,并产生一条"Connecting to driver的INFO。CoarseGrainedExecutorBackend通过DriverEndpoint.receive方法来处理来自Driver的命令,包括LaunchTaskKillTask等。这里注意一下,在scheduler中有一个CoarseGrainedSchedulerBackend,里面实现相似,在看代码时要注意区分开。

有关Executor和Driver的关系,下面这张图更加直观,需要说明的是,一个Worker上面也可能跑有多个Executor每个Task也可以在多个CPU核心上面运行

Spark上下文

在代码里我们操作一个Spark任务有两种方式,通过SparkContext,或者通过SparkSession

  1. SparkContext方式
    SparkContext是Spark自创建来一直存在的类。我们通过SparkConf直接创建SparkContext

    1
    2
    val sparkConf = new SparkConf().setAppName("AppName").setMaster("local")
    val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
  2. SparkSession方式
    SparkSession是在Spark2.0之后提供的API,相比SparkContext,他提供了对SparkSQL的支持(持有SQLContext),例如createDataFrame等方法就可以通过SparkSession来访问。
    builder.getOrCreate()的过程中,虽然最终得到的是一个SparkSession,但实际上内部已经创建了一个SparkContext,并由这个SparkSession持有。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    val spark: SparkSession = SparkSession.builder() // 得到一个Builder
    .master("local").appName("AppName").config("spark.some.config.option", "some-value")
    .getOrCreate() // 得到一个SparkSession

    // SparkSession.scala
    val sparkContext = userSuppliedContext.getOrElse {
    val sparkConf = new SparkConf()
    options.foreach { case (k, v) => sparkConf.set(k, v) }

    // set a random app name if not given.
    if (!sparkConf.contains("spark.app.name")) {
    sparkConf.setAppName(java.util.UUID.randomUUID().toString)
    }

    SparkContext.getOrCreate(sparkConf)
    // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
    }

    applyExtensions(
    sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
    extensions)

    session = new SparkSession(sparkContext, None, None, extensions)

SparkEnv

SparkEnv持有一个Spark实例在运行时所需要的所有对象,包括Serializer、RpcEndpoint(在早期用的是Akka actor)、BlockManager、MemoryManager、BroadcastManager、SecurityManager、MapOutputTrackerMaster/Worker等等。SparkEnv由SparkContext创建,并在之后通过伴生对象SparkEnvget方法来访问。在创建时,Driver端的SparkEnv是SparkContext创建的时候调用SparkEnv.createDriverEnv创建的。Executor端的是其守护进程CoarseGrainedExecutorBackend创建的时候调用SparkEnv.createExecutorEnv方法创建的。这两个方法最后都会调用create方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Driver端
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

// Executor端
// CoarseGrainedExecutorBackend.scala
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()

// SparkEnv.scala
// create函数
val blockManager = new BlockManager(
executorId,
rpcEnv,
blockManagerMaster,
serializerManager,
conf, // Spark Conf对象
memoryManager,
mapOutputTracker,
shuffleManager,
blockTransferService,
securityManager,
externalShuffleClient)

Spark的任务调度

Spark的操作可以分为两种,Transform操作是lazy的,而Action操作是Eager的。每一个Action会产生一个Job。
Spark的Transform操作可以分为宽依赖(ShuffleDependency)和窄依赖(NarrowDependency)操作两种,其中窄依赖还有两个子类OneToOneDependencyRangeDependency。窄依赖操作表示父RDD的每个分区只被子RDD的一个分区所使用,例如unionmapfilter等的操作;而宽依赖恰恰相反。宽依赖需要shuffle操作,因为需要将父RDD的结果需要复制给不同节点用来生成子RDD,有关ShuffleDependency将在下面的Shuffle源码分析中详细说明。当DAG的执行中出现宽依赖操作时,Spark会将其前后划分为不同的Stage,在下一章节中将具体分析相关代码。

在Stage之下,就是若干个Task了。这些Task也就是Spark的并行单元,通常来说,按照当前Stage的最后一个RDD的分区数来计算,每一个分区都会启动一个Task来进行计算。我们可以通过rdd.partitions.size来获取一个RDD有多少个分区。

Task具有两种类型,ShuffleMapTaskResultTask。其中ResultTaskResultStage的Task,也就是最后一个Stage的Task。

Spark的存储管理

为了实现与底层细节的解耦,Spark的存储基于BlockManager给计算部分提供服务。类似于Driver和Executor,BlockManager机制也分为BlockManagerMaster和BlockManager。Driver上的BlockManagerMaster对于存在与Executor上的BlockManager统一管理。BlockManager只是负责管理所在Executor上的Block。
BlockManagerMaster和BlockManager都是在SparkEnv中创建的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Mapping from block manager id to the block manager's information.
val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
val blockManagerMaster = new BlockManagerMaster(
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(
rpcEnv,
isLocal,
conf,
listenerBus,
if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
externalShuffleClient
} else {
None
}, blockManagerInfo)),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
conf,
isDriver)

val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(
executorId,
rpcEnv,
blockManagerMaster,
serializerManager,
conf,
memoryManager,
mapOutputTracker,
shuffleManager,
blockTransferService,
securityManager,
externalShuffleClient)

BlockInfoManager用来管理Block的元信息,主要维护一个infos。其中level项表示这个block的存储级别。

1
2
3
4
5
6
7
// BlockInfoManager.scala
private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]

private[storage] class BlockInfo(
val level: StorageLevel,
val classTag: ClassTag[_],
val tellMaster: Boolean) {

Spark提供了如下的持久化级别,其中选项为useDiskuseMemoryuseOffHeapdeserializedreplication,分别表示是否采用磁盘、内存、堆外内存、反序列化以及持久化维护的副本数。其中反序列化为false时(好绕啊),会对对象进行序列化存储,能够节省一定空间,但同时会消耗计算资源。需要注意的是,cache操作是persist的一个特例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
object StorageLevel extends scala.AnyRef with scala.Serializable {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true) // 默认存储类别
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

Spark的内存管理

Spark Job执行流程分析

Job阶段

下面我们通过一个RDD上的Action操作count,查看Spark的Job是如何运行和调度的。特别注意的是,在SparkSQL中,Action操作有不同的执行流程,所以宜对比着看。count通过全局的SparkContext.runJob启动一个Job,这个函数转而调用DAGScheduler.runJobUtils.getIteratorSize实际上就是遍历一遍迭代器,以便统计count。

1
2
3
4
5
6
7
8
9
10
11
// RDD.scala
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
// Utils.scala
def getIteratorSize(iterator: Iterator[_]): Long = {
var count = 0L
while (iterator.hasNext) {
count += 1L
iterator.next()
}
count
}

在参数列表里面的下划线_的作用是将方法转为函数,而方法和函数的定义和区别可参考我的另一篇文章
下面查看runJob函数。比较有趣的是clean函数,它调用ClosureCleaner.clean方法,这个方法用来清理$outer域中未被引用的变量。因为我们要将闭包func序列化,并从Driver发送到Executor上面。序列化闭包的过程就是为每一个闭包生成一个可序列化类,在生成时,会将这个闭包所引用的外部对象也序列化。容易发现,如果我们为了使用外部对象的某些字段,而序列化整个对象,那么开销是很大的,因此通过clean来清除不需要的部分以减少序列化开销。此外,getCallSite用来生成诸如s"$lastSparkMethod at $firstUserFile:$firstUserLine"这样的字符串,它实际上会回溯调用栈,找到第一个不是在Spark包中的函数,即$lastSparkMethod,它是导致一个RDD创建的函数,比如各种Transform操作、sc.parallelize等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// SparkContext.scala
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
// CheckPoint机制
rdd.doCheckpoint()
}
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}

我们发现,传入的func只接受一个Iterator[_]参数,但是其形参声明却是接受TaskContextIterator[T]两个参数。这是为什么呢?这是因为runJob有不少重载函数,例如下面的这个

1
2
3
4
5
6
7
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}

下面我们查看DAGScheduler.runJob函数,它实际上就是调用submitJob,然后等待Job执行的结果。由于Spark的DAGScheduler是基于事件循环的,它拥有一个DAGSchedulerEventProcessLoop类型的变量eventProcessLoop,不同的对象向它post事件,然后在它的onReceive循环中会依次对这些事件调用处理函数。
我们需要注意的是partitions不同于我们传入的rdd.partitions,前者是一个Array[Int],后者是一个Array[Partition]。并且在逻辑意义上,前者表示需要计算的partition,对于如first之类的Action操作来说,它只是rdd的所有partition的一个子集,我们将在稍后的submitMissingTasks函数中继续看到这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def runJob[T, U](...): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

// 下面就是在等了
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}

def submitJob[T, U](
rdd: RDD[T], // target RDD to run tasks on,就是被执行count的RDD
func: (TaskContext, Iterator[T]) => U, // 在RDD每一个partition上需要跑的函数
partitions: Seq[Int],
callSite: CallSite, // 被调用的位置
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 检查是否在一个不存在的分区上创建一个Task
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions)}

// jobId是从后往前递增的
val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
val time = clock.getTimeMillis()
// listenerBus是一个LiveListenerBus对象,从DAGScheduler构造时得到,用来做event log
// SparkListenerJobStart定义在SparkListener.scala文件中
listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties)))
listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
// 如果partitions是空的,那么就直接返回
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.nonEmpty)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
// 我们向eventProcessLoop提交一个JobSubmitted事件
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
// DAGSchedulerEvent.scala
private[scheduler] case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent

下面我们具体看看对JobSubmitted的响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// DAGScheduler.scala
private[scheduler] def handleJobSubmitted(...) {
var finalStage: ResultStage = null
// 首先我们尝试创建一个`finalStage: ResultStage`,这是整个Job的最后一个Stage。
try {
// func: (TaskContext, Iterator[_]) => _
// 下面的语句是可能抛BarrierJobSlotsNumberCheckFailed或者其他异常的,
// 例如一个HadoopRDD所依赖的HDFS文件被删除了
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
...

// DAGScheduler.scala
private def createResultStage(...): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

这里createResultStage所返回的ResultStage继承了Stage类。Stage类有个rdd参数,对ResultStage而言就是finalRDD,对ShuffleMapStage而言就是ShuffleDependency.rdd

1
2
3
4
5
// DAGScheduler.scala
def createShuffleMapStage[K, V, C](
shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
...

下面我们来看看checkBarrierStageWithNumSlots这个函数,因为它会抛出BarrierJobSlotsNumberCheckFailed这个异常,被handleJobSubmitted捕获。这个函数主要是为了检测是否有足够的slots去运行所有的barrier task。屏障调度器是Spark为了支持深度学习在2.4.0版本所引入的一个特性。它要求在barrier stage中同时启动所有的Task,当任意的task执行失败的时候,总是重启整个barrier stage。这么麻烦是因为Spark希望能够在Task中提供一个barrier以供显式同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// DAGScheduler.scala
private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
val numPartitions = rdd.getNumPartitions
val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
}
}

// DAGScheduler.scala
...
case e: BarrierJobSlotsNumberCheckFailed =>
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
// barrierJobIdToNumTasksCheckFailures是一个ConcurrentHashMap,表示对每个BarrierJob上失败的Task数量
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
(_: Int, value: Int) => value + 1)

...

if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}

case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
...

下面开始创建Job。ActiveJob表示在DAGScheduler里面运行的一个Job。
Job只负责向“叶子”Stage要结果,而之前Stage的运行是由DAGScheduler来调度的。这是因为若干Job可能共用同一个Stage的计算结果,所以将某个Stage强行归属到某个Job是不符合Spark设计逻辑的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// DAGScheduler.scala
...
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
// 在这里会打印四条日志,这个可以被用来在Spark.log里面定位事件
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

...

val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 从最后一个stage开始调用submitStage
submitStage(finalStage)
}

Stage阶段

Stage是如何划分的呢?又是如何计算Stage之间的依赖的?我们继续查看submitStage这个函数,对于一个Stage,首先调用getMissingParentStages来看看它的父Stage能不能直接用,也就是说这个Stage的rdd所依赖的所有父RDD能不能直接用,如果不行的话,就要先算父Stage的。在前面的论述里,我们知道,若干Job可能共用同一个Stage的计算结果,而不同的Stage也可能依赖同一个RDD。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private def submitStage(stage: Stage) {
// 找到这个stage所属的job
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 如果依赖之前的Stage,先列出来,并且按照id排序
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
// 运行这个Stage
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
// 先提交所有的parent stage
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

下面具体查看getMissingParentStages这个函数,可以看到,Stage的计算链是以最后一个RDD为树根逆着向上遍历得到的,而这个链条的终点要么是一个ShuffleDependency,要么是一个所有分区都被缓存了的RDD。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
// 这里是个**DFS**,栈是手动维护的,主要是为了防止爆栈
waitingForVisit += stage.rdd
def visit(rdd: RDD[_]): Unit = {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
// 如果这个RDD有没有被缓存的Partition,那么它就需要被计算
for (dep <- rdd.dependencies) {
// 我们检查这个RDD的所有依赖
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
// 我们发现一个宽依赖,因此我们创建一个新的Shuffle Stage,并加入到missing中(如果不存在)
// 由于是宽依赖,所以我们不需要向上找了
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
// 如果是一个窄依赖,就加入到waitingForVisit中
// prepend是在头部加,+=是在尾部加
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
}
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.remove(0))
}
missing.toList
}

Task阶段

下面是重头戏submitMissingTasks,这个方法负责生成TaskSet,并且将它提交给TaskScheduler低层调度器。
partitionsToCompute计算有哪些分区是待计算的。根据Stage类型的不同,findMissingPartitions的计算方法也不同。对于ResultStage来说,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// DAGScheduler.scala
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...

// ResultStage.scala
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
// ActiveJob.scala
val numPartitions = finalStage match {
// 对于ResultStage,不一定得到当前rdd的所有分区,例如first()和lookup()的Action,
// 因此这里是r.partitions而不是r.rdd.partitions
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}

// ShuffleMapStage.scala
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}

// MapOutputTrackerMaster.scala
def findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = {
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}

这个outputCommitCoordinator是由SparkEnv维护的OutputCommitCoordinator对象,它决定到底谁有权利向HDFS写数据。在Executor上的请求会通过他持有的Driver的OutputCommitCoordinatorEndpoint的引用发送给Driver处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// DAGScheduler.scala
...
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage
// 在检测Tasks是否serializable之前,就要SparkListenerStageSubmitted,
// 如果不能serializable,那就在这**之后**给一个SparkListenerStageCompleted

stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
...

getPreferredLocs计算每个分区的最佳计算位置,它实际上是调用getPreferredLocsInternal这个函数。这个函数是一个关于visit: HashSet[(RDD[_], Int)]的递归函数,visit用(rdd, partition)元组唯一描述一个分区。getPreferredLocs的计算逻辑是这样的:

  1. 如果已经visit过了,就返回Nil
  2. 如果是被cached的,通过getCacheLocs返回cache的位置
  3. 如果RDD有自己的偏好位置,例如输入RDD,那么使用rdd.preferredLocations返回它的偏好位置
  4. 如果还没返回,但RDD有窄依赖,那么遍历它的所有依赖项,返回第一个具有位置偏好的依赖项的值

理论上,一个最优的位置选取应该尽可能靠近数据源以减少网络传输,但目前版本的Spark还没有实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// DAGScheduler.scala
...
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
// 如果有非致命异常就创建一个新的Attempt,并且abortStage(这还不致命么)
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
...

下面,我们开始attempt这个Stage,我们需要将taskBinaryBytes通过closureSerializer打包,然后广播。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// DAGScheduler.scala
...
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

// 如果没有Task要执行,实际上就是skip了,那么就没有Submission Time这个字段
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
// consistent view of both variables.
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
// 注意这里的stage.func已经被ClosureCleaner清理过了
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

partitions = stage.rdd.partitions
}

if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
// 广播
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage

// Abort execution
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage

// Abort execution
return
}

下面,我们根据Stage的类型生成Task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// DAGScheduler.scala
...
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}

case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
...
}

我们将生成的tasks包装成一个TaskSet,并且提交给taskScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// DAGScheduler.scala
...
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}

Shuffle

在Stage和Stage之间,Spark需要Shuffle数据。这个流程包含上一个Stage上的Shuffle Write,中间的数据传输,以及下一个Stage的Shuffle Read。如下图所示

Shuffle类操作常常发生在宽依赖的RDD之间,这类算子需要将多个节点上的数据拉取到同一节点上进行计算,其中存在大量磁盘IO、序列化和网络传输开销,它们可以分为以下几点来讨论。
当Spark中的某个节点故障之后,常常需要重算RDD中的某几个分区。对于窄依赖而言,父RDD的一个分区只对应一个子RDD分区,因此丢失子RDD的分区,重算整个父RDD分区是必要的。而对于宽依赖而言,父RDD会被多个子RDD使用,而可能当前丢失的子RDD只使用了父RDD中的某几个分区的数据,而我们仍然要重新计算整个父RDD,这造成了计算资源的浪费。
当使用Aggregate类(如groupByKey)或者Join类这种Shuffle算子时,如果选择的key上的数据是倾斜(skew)的,会导致部分节点上的负载增大。对于这种情况除了可以增加Executor的内存,还可以重新选择分区函数(例如在之前的key上加盐)来平衡分区。
Shuffle Read操作容易产生OOM,其原因是尽管在BlockStoreShuffleReader中会产生外部排序的resultIter,但在这之前,ExternalAppendOnlyMap先要从BlockManager拉取数据(k, v)到自己的currentMap中,如果这里的v很大,那么就会导致Executor的OOM问题。可以从PairRDDFunctions的文档中佐证这一点。在Dataset中并没有reduceByKey,原因可能与Catalyst Optimizer的优化有关,但考虑到groupByKey还是比较坑的,感觉这个举措并不明智。

Shuffle考古

在Spark0.8版本前,Spark只有Hash Based Shuffle的机制。在这种方式下,假定Shuffle Write阶段(有的也叫Map阶段)有W个Task,在Shuffle Read阶段(有的也叫Reduce阶段)有R个Task,那么就会产生W*R个文件。这样的坏处是对文件系统产生很大压力,并且IO也差(随机读写)。由于这些文件是先全量在内存里面构造,再dump到磁盘上,所以Shuffle在Write阶段就很可能OOM。
为了解决这个问题,在Spark 0.8.1版本加入了File Consolidation,以求将W个Task的输出尽可能合并。现在,Executor上的每一个执行单位都生成自己独一份的文件。假定总共有C个核心,每个Task占用T个核心,那么总共有C/T个执行单位。考虑极端情况,如果C==T,那么任务实际上是串行的,所以写一个文件就行了。因此,最终会生成C/T*R个文件。
但这个版本仍然没有解决OOM的问题。虽然对于reduce这类操作,比如count,因为是来一个combine一个,所以只要你的V不是数组,也不想强行把结果concat成一个数组,一般都没有较大的内存问题。但是考虑如果我们执行groupByKey这样的操作,在Read阶段每个Task需要得到得到自己负责的key对应的所有value,而我们现在每个Task得到的是若干很大的文件,这个文件里面的key是杂乱无章的。如果我们需要得到一个key对应的所有value,那么我们就需要遍历这个文件,将key和对应的value全部存放在一个结构比如HashMap中,并进行合并。因此,我们必须保证这个HashMap足够大。既然如此,我们很容易想到一个基于外部排序的方案,我们为什么不能对key进行外排呢?确实在Hadoop MapReduce中会做归并排序,因此Reducer侧的数据按照key组织好的了。但Spark在这个版本没有这么做,并且Spark在下一个版本就这么做了。
在Spark 0.9版本之后,引入了ExternalAppendOnlyMap,通过这个结构,SparkShuffle在combine的时候如果内存不够,就能Spill到磁盘,并在Spill的时候进行排序。当然,内存还是要能承载一个KV的,我们将在稍后的源码分析中深入研究这个问题。

终于在Spark1.1版本之后引入了Sorted Based Shuffle。此时,Shuffle Write阶段会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,Shuffle Read的Task可以通过该索引文件获取相关的数据。

Shuffle Read端源码分析

Shuffle Read一般位于一个Stage的开始,这时候上一个Stage会给我们留下一个ShuffledRDD。在它的compute方法中会首先取出shuffleManager: ShuffleManagerShuffleManager是一个Trait,它的两个实现就是org.apache.spark.shuffle.hash.HashShuffleManager
org.apache.spark.shuffle.sort.SortShuffleManager

1
2
3
4
5
6
7
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
SparkEnv.get.shuffleManager // 由SparkEnv维护的ShuffleManager
.getReader(dep.shuffleHandle, split.index, split.index + 1, context, metrics) // 返回一个BlockStoreShuffleReader
.read().asInstanceOf[Iterator[(K, C)]]
}

ShuffleManager通过调用BlockStoreShuffleReader.read返回一个迭代器Iterator[(K, C)]。在BlockStoreShuffleReader.read方法中,首先得到一个ShuffleBlockFetcherIterator

1
2
3
4
5
6
// BlockStoreShuffleReader.scala
override def read(): Iterator[Product2[K, C]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
...
) // 返回一个ShuffleBlockFetcherIterator
.toCompletionIterator // 返回一个Iterator[(BlockId, InputStream)]

ShuffleBlockFetcherIteratorfetchUpToMaxBytes()fetchLocalBlocks()分别读取remote和local的Block。在拉取远程数据的时候,会统计bytesInFlightreqsInFlight等信息,并使用maxBytesInFlightmaxReqsInFlight节制。同时,为了允许5个并发同时拉取数据,还会设置targetRemoteRequestSize = math.max(maxBytesInFlight / 5, 1L)去请求每次拉取数据的最大大小。通过ShuffleBlockFetcherIterator.splitLocalRemoteBytes,现在改名叫partitionBlocksByFetchMode函数,可以将blocks分为Local和Remote的。关于两个函数的具体实现,将单独讨论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val serializerInstance = dep.serializer.newInstance()

// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}

// Update the context task metrics for each record read.
// CompletionIterator相比普通的Iterator的区别就是在结束之后会调用一个completion函数
// CompletionIterator通过它伴生对象的apply方法创建,传入第二个参数即completionFunction
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
},
context.taskMetrics().mergeShuffleReadMetrics())

// An interruptible iterator must be used here in order to support task cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
...

经过一系列转换,我们得到一个interruptibleIter。接下来,根据是否有mapSideCombine对它进行聚合。这里的dep来自于BaseShuffleHandle对象,它是一个ShuffleDependency。在前面Spark的任务调度中已经提到,ShuffleDependency就是宽依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// BlockStoreShuffleReader.scala
...
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// We are reading values that are already combined
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// We don't know the value type, but also don't care -- the dependency *should*
// have made sure its compatible w/ this aggregator, which will convert the value
// type to the combined type C
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}

这里的aggregatorAggregator[K, V, C],这里的KVC与熟悉combineByKey的是一样的。需要注意的是,在combine的过程中借助了ExternalAppendOnlyMap,这是之前提到的在Spark 0.9中引入的重要特性。通过调用insertAll方法能够将interruptibleIter内部的数据添加到ExternalAppendOnlyMap中,并在之后更新MemoryBytesSpilled、DiskBytesSpilled、PeakExecutionMemory三个统计维度,这也是我们在Event Log中所看到的统计维度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Aggregator.scala
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

def combineValuesByKey(
iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}

def combineCombinersByKey(
iter: Iterator[_ <: Product2[K, C]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}

/** Update task metrics after populating the external map. */
private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {
Option(context).foreach { c =>
c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
}
}
}

在获得Aggregate迭代器之后,最后一步,我们要进行排序,这时候就需要用到ExternalSorter这个对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// BlockStoreShuffleReader.scala
...
val resultIter = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}

ExternalAppendOnlyMap和AppendOnlyMap

我们查看ExternalAppendOnlyMap的实现。ExternalAppendOnlyMap拥有一个currentMap管理在内存中存储的键值对们。和一个DiskMapIterator的数组spilledMaps,表示Spill到磁盘上的键值对们。

1
2
@volatile private[collection] var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]

先来看currentMap,它是一个SizeTrackingAppendOnlyMap。这个东西实际上就是一个AppendOnlyMap,不过给它加上了统计数据大小的功能,主要是借助于SizeTrackerafterUpdateresetSamples两个方法。我们知道非序列化对象在内存存储上是不连续的,我们需要通过遍历迭代器才能知道对象的具体大小,而这个开销是比较大的。因此,通过SizeTracker我们可以得到一个内存空间占用的估计值,从来用来判定是否需要Spill。
下面,我们来看currentMap.insertAll这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// AppendOnlyMap.scala
def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
if (currentMap == null) {
throw new IllegalStateException(
"Cannot insert new elements into a map after calling iterator")
}
// 我们复用update函数,从而避免每一次都创建一个新的闭包(编程环境这么恶劣的么。。。)
var curEntry: Product2[K, V] = null
val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal)
// 如果不是第一个V,就merge
// mergeValue: (C, V) => C,
mergeValue(oldVal, curEntry._2)
else
// 如果是第一个V,就新建一个C
// createCombiner: V => C,
createCombiner(curEntry._2)
}

while (entries.hasNext) {
curEntry = entries.next()
val estimatedSize = currentMap.estimateSize()
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
if (maybeSpill(currentMap, estimatedSize)) {
// 如果发生了Spill,就重新创建一个currentMap
currentMap = new SizeTrackingAppendOnlyMap[K, C]
}

// key: K, updateFunc: (Boolean, C) => C
currentMap.changeValue(curEntry._1, update)
addElementsRead()
}
}

可以看出,在insertAll中主要做了两件事情:

  1. 遍历curEntry <- entries,并通过传入的update函数进行Combine
    在内部存储上,AppendOnlyMap,包括后面将看到的一些其他KV容器,都倾向于将(K, V)对放到哈希表的相邻两个位置,这样的好处应该是避免访问时再进行一次跳转。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    // AppendOnlyMap.scala

    // 这里的nullValue和haveNullValue是用来单独处理k为null的情况的,下面会详细说明
    private var haveNullValue = false
    // 有关null.asInstanceOf[V]的花里胡哨的语法,详见 https://stackoverflow.com/questions/10749010/if-an-int-cant-be-null-what-does-null-asinstanceofint-mean
    private var nullValue: V = null.asInstanceOf[V]

    def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    // updateFunc就是从insertAll传入的update
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
    if (!haveNullValue) {
    // 如果这时候还没有null的这个key,就新创建一个
    incrementSize()
    }
    nullValue = updateFunc(haveNullValue, nullValue)
    haveNullValue = true
    return nullValue
    }
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
    // 乘以2的原因是他按照K1 V1 K2 V2这样放的
    val curKey = data(2 * pos)
    if (curKey.eq(null)) {
    // 如果对应的key不存在,就新创建一个
    // 这也是为什么前面要单独处理null的原因,这里的null被用来做placeholder了
    // 可以看到,第一个参数传的false,第二个是花里胡哨的null
    val newValue = updateFunc(false, null.asInstanceOf[V])
    data(2 * pos) = k
    data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
    incrementSize()
    return newValue
    } else if (k.eq(curKey) || k.equals(curKey)) { // 又是从Java继承下来的花里胡哨的特性
    val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
    data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
    return newValue
    } else {
    // 再散列
    val delta = i
    pos = (pos + delta) & mask
    i += 1
    }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
    }
  2. 估计currentMap的当前大小,并调用currentMap.maybeSpill向磁盘Spill
    我们将在单独的章节论述SizeTracker如何估计集合大小,先看具体的Spill过程,可以梳理出shouldSpill==true的情况
    1、 elementsRead % 32 == 0
    2、 currentMemory >= myMemoryThreshold
    3、 通过acquireMemory请求的内存不足以扩展到2 * currentMemory的大小

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // Spillable.scala
    protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
    val amountToRequest = 2 * currentMemory - myMemoryThreshold
    val granted = acquireMemory(amountToRequest)
    myMemoryThreshold += granted
    shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    ...

    // MemoryConsumer.scala
    public long acquireMemory(long size) {
    long granted = taskMemoryManager.acquireExecutionMemory(size, this);
    used += granted;
    return granted;
    }

    下面就是真正Spill的过程了,其实就是调用spill函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // Spillable.scala
    ...
    // Actually spill
    if (shouldSpill) {
    _spillCount += 1 // 统计Spill的次数
    logSpillage(currentMemory)
    spill(collection)
    _elementsRead = 0 // 重置强制Spill计数器_elementsRead
    // 这里就是我们在Event Log里面看到的Memory Spill的统计量
    // 他表示在Spill之后我们能够释放多少内存
    _memoryBytesSpilled += currentMemory
    releaseMemory()
    }
    shouldSpill
    }

insertAll之后,会返回一个迭代器,我们查看相关方法。可以发现如果spilledMaps都是空的,也就是没有Spill的话,就返回内存里面currentMapiterator,否则就返回一个ExternalIterator
对于第一种情况,会用SpillableIterator包裹一下。这个类在很多地方有定义,包括ExternalAppendOnlyMap.scalaExternalSorter.scala里面。在当前使用的实现中,它实际上就是封装了一下Iterator,使得能够spill,转换成CompletionIterator等。
对于第二种情况,ExternalIterator比较有趣,将在稍后进行讨论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ExternalAppendOnlyMap.scala
override def iterator: Iterator[(K, C)] = {
...
if (spilledMaps.isEmpty) {
// 如果没有发生Spill
destructiveIterator(currentMap.iterator)
} else {
// 如果发生了Spill
new ExternalIterator()
}
}

def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
readingIterator = new SpillableIterator(inMemoryIterator)
readingIterator.toCompletionIterator
}

currentMap.iterator实际上就是一个朴素无华的迭代器的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// AppendOnlyMap.scala
def nextValue(): (K, V) = {
if (pos == -1) { // Treat position -1 as looking at the null value
if (haveNullValue) {
return (null.asInstanceOf[K], nullValue)
}
pos += 1
}
while (pos < capacity) {
if (!data(2 * pos).eq(null)) {
return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
}
pos += 1
}
null
}

ExternalSorter

ExternalSorter的作用是对输入的(K, V)进行排序,以产生新的(K, C)对,排序过程中可选择进行combine,否则输出的C == V。需要注意的是ExternalSorter不仅被用在Shuffle Read端,也被用在了Shuffle Write端,所以在后面会提到Map-side combine的概念。ExternalSorter具有如下的参数,在给定ordering之后,ExternalSorter就会按照它来排序。在Spark源码中建议如果希望进行Map-side combining的话,就指定ordering,否则就可以设置orderingnull

1
2
3
4
5
6
7
private[spark] class ExternalSorter[K, V, C](
context: TaskContext,
aggregator: Option[Aggregator[K, V, C]] = None,
partitioner: Option[Partitioner] = None,
ordering: Option[Ordering[K]] = None,
serializer: Serializer = SparkEnv.get.serializer)
extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())

由于ExternalSorter支持有combine和没有combine的两种模式,因此对应设置了两个对象。map = new PartitionedAppendOnlyMap[K, C],以及buffer = new PartitionedPairBuffer[K, C]。其中,PartitionedAppendOnlyMap就是一个SizeTrackingAppendOnlyMapPartitionedPairBuffer则继承了WritablePartitionedPairCollection,由于不需要按照key进行combine,所以它的实现接近于一个Array。
ExternalSorter.insertAll方法和之前看到的ExternalAppendOnlyMap方法是大差不差的,他也会对可以聚合的特征进行聚合,并且TODO上还说如果聚合之后的reduction factor不够明显,就停止聚合。

相比之前的aggregator,ExternalSorter不仅能aggregate,还能sort。所以为啥不直接搞一个ExternalSorter而是还要在前面垫一个ExternalAppendOnlyMap呢?

ExternalIterator

下面我们来看ExternalAppendOnlyMapExternalIterator的实现。它是一个典型的外部排序的实现,有一个PQ用来merge。不过这次的迭代器换成了destructiveSortedIterator,也就是我们都是排序的了。这个道理也是显而易见的,不sort一下,我们怎么和硬盘上的数据做聚合呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ExternalAppendOnlyMap.scala
val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
val sortedMap = destructiveIterator(currentMap.destructiveSortedIterator(keyComparator))
// 我们得到一个Array的迭代器
val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
val kcPairs = new ArrayBuffer[(K, C)]
// 读完所有具有所有相同hash(key)的序列,并创建一个StreamBuffer
// 需要注意的是,由于哈希碰撞的原因,里面可能有多个key
readNextHashCode(it, kcPairs)
if (kcPairs.length > 0) {
mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
}
}

我们先来看看destructiveSortedIterator的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// AppendOnlyMap.scala
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
var keyIndex, newIndex = 0
// 下面这个循环将哈希表里面散乱的KV对压缩到最前面
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))

new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

// 这下面和前面实现大差不差,就省略了
new Iterator[(K, V)] {
...
}
}

下面我们来看看实现的next()接口函数,它是外部排序中的一个典型的归并过程。我们需要注意的是minBuffer是一个StreamBuffer,维护一个hash(K), VArrayBuffer,类似H1 V1 H1 V2 H2 V3这样的序列,而不是我们想的(K, V)流。因此其中是可能有哈希碰撞的。我们从mergeHeapdequeue出来的StreamBuffer是当前hash(K)最小的所有K的集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
override def next(): (K, C) = {
if (mergeHeap.isEmpty) {
// 如果堆是空的,就再见了
throw new NoSuchElementException
}
// Select a key from the StreamBuffer that holds the lowest key hash
// mergeHeap选择所有StreamBuffer中最小hash的,作为minBuffer
val minBuffer = mergeHeap.dequeue()
// minPairs是一个ArrayBuffer[T],表示这个StreamBuffer维护的所有KV对
val minPairs = minBuffer.pairs
val minHash = minBuffer.minKeyHash
// 从一个ArrayBuffer[T]中移出Index为0的项目
val minPair = removeFromBuffer(minPairs, 0)
// 得到非哈希的 (minKey, minCombiner)
val minKey = minPair._1
var minCombiner = minPair._2
assert(hashKey(minPair) == minHash)

// For all other streams that may have this key (i.e. have the same minimum key hash),
// merge in the corresponding value (if any) from that stream
val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
while (mergeHeap.nonEmpty && mergeHeap.head.minKeyHash == minHash) {
val newBuffer = mergeHeap.dequeue()
// 如果newBuffer的key和minKey相等的话(考虑哈希碰撞),就合并
minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
mergedBuffers += newBuffer
}

// Repopulate each visited stream buffer and add it back to the queue if it is non-empty
mergedBuffers.foreach { buffer =>
if (buffer.isEmpty) {
readNextHashCode(buffer.iterator, buffer.pairs)
}
if (!buffer.isEmpty) {
mergeHeap.enqueue(buffer)
}
}

(minKey, minCombiner)
}

SizeTracker

首先在每次集合更新之后,会调用afterUpdate,当到达采样的interval之后,会takeSample

1
2
3
4
5
6
7
// SizeTracker.scala
protected def afterUpdate(): Unit = {
numUpdates += 1
if (nextSampleNum == numUpdates) {
takeSample()
}
}

takeSample函数中第一句话就涉及多个对象,一个一个来看。

1
2
3
4
// SizeTracker.scala
private def takeSample(): Unit = {
samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
...

SizeEstimator.estimate的实现类似去做一个state队列上的BFS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private def estimate(obj: AnyRef, visited: IdentityHashMap[AnyRef, AnyRef]): Long = {
val state = new SearchState(visited)
state.enqueue(obj)
while (!state.isFinished) {
visitSingleObject(state.dequeue(), state)
}
state.size
}

private def visitSingleObject(obj: AnyRef, state: SearchState): Unit = {
val cls = obj.getClass
if (cls.isArray) {
visitArray(obj, cls, state)
} else if (cls.getName.startsWith("scala.reflect")) {
// Many objects in the scala.reflect package reference global reflection objects which, in
// turn, reference many other large global objects. Do nothing in this case.
} else if (obj.isInstanceOf[ClassLoader] || obj.isInstanceOf[Class[_]]) {
// Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses
// the size estimator since it references the whole REPL. Do nothing in this case. In
// general all ClassLoaders and Classes will be shared between objects anyway.
} else {
obj match {
case s: KnownSizeEstimation =>
state.size += s.estimatedSize
case _ =>
val classInfo = getClassInfo(cls)
state.size += alignSize(classInfo.shellSize)
for (field <- classInfo.pointerFields) {
state.enqueue(field.get(obj))
}
}
}
}

然后我们创建一个Sample,并且放到队列samples

1
2
3
private object SizeTracker {
case class Sample(size: Long, numUpdates: Long)
}

下面的主要工作就是计算一个bytesPerUpdate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  ...
// Only use the last two samples to extrapolate
// 如果sample太多了,就删除掉一些
if (samples.size > 2) {
samples.dequeue()
}
val bytesDelta = samples.toList.reverse match {
case latest :: previous :: tail =>
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
// If fewer than 2 samples, assume no change
case _ => 0
}
bytesPerUpdate = math.max(0, bytesDelta)
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}

我们统计到上次估算之后经历的update数量,并乘以bytesPerUpdate,即可得到总大小

1
2
3
4
5
6
// SizeTracker.scala
def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
}

Shuffle Write端源码分析

Shuffle Write端的实现主要依赖ShuffleManager中的ShuffleWriter对象,目前使用的ShuffleManagerSortShuffleManager,因此只讨论它。它是一个抽象类,主要有SortShuffleWriterUnsafeShuffleWriterBypassMergeSortShuffleWriter等实现。

SortShuffleWriter

1
2
3
4
5
6
7
8
private[spark] abstract class ShuffleWriter[K, V] {
/** Write a sequence of records to this task's output */
@throws[IOException]
def write(records: Iterator[Product2[K, V]]): Unit

/** Close this writer, passing along whether the map completed */
def stop(success: Boolean): Option[MapStatus]
}

SortShuffleWriter的实现可以说很简单了,就是将records放到一个ExternalSorter里面,然后创建一个ShuffleMapOutputWritershuffleExecutorComponents实际上是一个LocalDiskShuffleExecutorComponentsShuffleMapOutputWriter是一个Java接口,实际上被创建的是LocalDiskShuffleMapOutputWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// 如果不需要进行mapSideCombine,那么我们传入空的aggregator和ordering,
// 我们在map端不负责对key进行排序,统统留给reduce端吧
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
...

紧接着,调用ExternalSorter.writePartitionedMapOutput将自己维护的map或者buffer(根据是否有Map Side Aggregation)写到mapOutputWriter提供的partitionWriter里面。其过程用到了一个destructiveSortedWritablePartitionedIterator的迭代器,相比destructiveSortedIterator,它是多了Writable和Partitioned两个词。前者的意思是我可以写到文件,后者的意思是我先按照partitionId排序,然后在按照给定的Comparator排序。
接着就是commitAllPartitions,这个函数负责创建一个索引文件,并原子地提交。注意到,到当前版本,每一个执行单元只会生成一份数据文件和一份索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  ...
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
val partitionLengths = mapOutputWriter.commitAllPartitions()
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

// LocalDiskShuffleMapOutputWriter.java
@Override
public long[] commitAllPartitions() throws IOException {
...
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
return partitionLengths;
}

// IndexShuffleBlockResolver.java
def writeIndexFileAndCommit(shuffleId: Int, mapId: Long, lengths: Array[Long], dataTmp: File): Unit
override def getBlockData(blockId: BlockId, dirs: Option[Array[String]]): ManagedBuffer

写完的索引文件将会被getBlockData函数查阅,从而知道每个block的开始与结束位置。这个getBlockData同样位于IndexShuffleBlockResolver类中。它们继承了ShuffleBlockResolver这个trait,用定义如何从一个logical shuffle block identifier(例如map、reduce或shuffle)中。这个类维护Block和文件的映射关系,维护index文件,向BlockStore提供抽象。

BypassMergeSortShuffleWriter

下面我们来看看BypassMergeSortShuffleWriter的实现。它到底Bypass了什么东西呢?其实是sort和aggregate。

fetchLocalBlocks和fetchUpToMaxBytes的实现

简单说明一下fetchLocalBlocksfetchUpToMaxBytes的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ShuffleBlockFetcherIterator.scala
private[this] val localBlocks = scala.collection.mutable.LinkedHashSet[BlockId]()
private[this] def fetchLocalBlocks() {
logDebug(s"Start fetching local blocks: ${localBlocks.mkString(", ")}")
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
...

// BlockManager.scala
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
// 需要通过ShuffleBlockResolver来获取
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
getLocalBytes(blockId) match {

Spark分布式部署方式

Spark自有部署方式

最常用的其实是单机模式也就是spark-submit --master local,这里local是默认选项。在程序执行过程中,只会生成一个SparkSubmit进程,不会产生Master和Worker节点,也不依赖Hadoop。当然,Windows里面可能需要winutils这个工具的,但也是直接下载,而不需要装Hadoop。
在集群化上,Spark可以部署在On Yarn和On Mesos、K8S和Standalone上面,而又分别对应了Cluster和Client两种deploy mode。

首先是Spark自带Cluster Manager的Standalone Client模式,也是我们最常用的集群测试模式,需要启动Master和Slave节点,但仍然不依赖Hadoop。

1
./bin/spark-submit --master spark://localhost:7077  --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.11-2.4.4.jar 100

下面一种是Spark自带Cluster Manager的Standalone Cluster模式,一字之差,还是有不同的,用下面的方式启动

1
./bin/spark-submit --master spark://wl1:6066 --deploy-mode cluster # 默认cluster

上面两种的配置一般修改Spark的spark-defaults.conf和spark-env.sh也就可以了,不涉及hadoop

Yarn

Spark跑在yarn上面,这个还依赖hadoop集群,但Spark不需要自己提供Master和Worker了。Yarn同样提供了Cluster和Client两种模式,如下所示

1
2
./bin/spark-submit --master yarn-cluster
./bin/spark-submit --master yarn-client

Yarn Cluster就是通常使用的部署方式,此时Spark Driver是运行在Yarn的ApplicationMaster上的,而Client方式的Driver是在任务提交机上面运行,ApplicationMaster只负责向ResourceManager申请Executor需要的资源

我们在Spark的WebUI中常常看到诸如Container、ApplicationMaster、ResourceMaster、NodeMaster这些东西,其实他们都是Yarn里面的常见概念。具体的联系在下面的图上一目了然。

对应到Spark中,每个SparkContext对应一个ApplicationMaster,每个Executor对应一个Container

SparkSQL

打开项目源码根目录,SparkSQL由4个项目组成,分别为Spark Core、Spark Catalyst、Spark Hive和Spark Hive ThriftServer。我们主要和前两者打交道,Core中就是SparkSQL的核心,包括Dataset等类的实现。Catalyst是Spark的水晶优化器。

DataFrame和Dataset

我们可以将RDD看为一个分布式的容器M[T],我们对T是未知的。而事实上我们处理数据集往往就是个来自HBase或者其他数据仓库大宽表。如果使用RDD会导致很多的拆箱和装箱的操作。并且由于T是一个黑盒,Spark也很难对RDD的计算进行优化。为此,Spark推出了SparkSQL来解决这个问题。而SparkSQL的一个核心机制就是DataFrame和Dataset。由于DataFrame可以看做一个Dataset[Row],所以,我们主要以Dataset来研究对象。

从RDD到DF/DS

RDD可以通过下面代码中的一个隐式转换 得到一个DatasetHolder,接着借助于DatasetHolder中提供的toDStoDF来实现到DataFrameDataset的转换。

1
2
3
implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}

其实在上面文件里面还定义了一系列隐式转换所需要的Encoder,例如对于大多数的case class都需要调用newProductArrayEncoder。有关这部分的进一步说明,可以查看文章

同样,从Dataset/DataFrame到RDD可以通过调用.rdd方法来轻松得到。不过这个操作是Action么?在爆栈网上有相关讨论1,认为不是Action但有开销;和相关讨论2,认为是无开销的。我们查看具体代码

1
2
3
4
5
6
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
rddQueryExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}

从DF到DS

从DS到DF

1
2
3
4
5
6
7
8
9
10
11
12
@scala.annotation.varargs
def toDF(colNames: String*): DataFrame = {
require(schema.size == colNames.size,
"The number of columns doesn't match.\n" +
s"Old column names (${schema.size}): " + schema.fields.map(_.name).mkString(", ") + "\n" +
s"New column names (${colNames.size}): " + colNames.mkString(", "))

val newCols = logicalPlan.output.zip(colNames).map { case (oldAttribute, newName) =>
Column(oldAttribute).as(newName)
}
select(newCols : _*)
}

Dataset算子

在Dataset中,同样提供了诸如map之类的算子,不过它们的实现是从Dataset和DataFrame之间的变换了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan {
MapElements[T, U](func, logicalPlan)
}

object MapElements {
def apply[T : Encoder, U : Encoder](
func: AnyRef,
child: LogicalPlan): LogicalPlan = {
val deserialized = CatalystSerde.deserialize[T](child)
val mapped = MapElements(
func,
implicitly[Encoder[T]].clsTag.runtimeClass,
implicitly[Encoder[T]].schema,
CatalystSerde.generateObjAttr[U],
deserialized)
CatalystSerde.serialize[U](mapped)
}
}

case class MapElements(
func: AnyRef,
argumentClass: Class[_],
argumentSchema: StructType,
outputObjAttr: Attribute,
child: LogicalPlan) extends ObjectConsumer with ObjectProducer

object TypedFilter {
def apply[T : Encoder](func: AnyRef, child: LogicalPlan): TypedFilter = {
TypedFilter(
func,
implicitly[Encoder[T]].clsTag.runtimeClass,
implicitly[Encoder[T]].schema,
UnresolvedDeserializer(encoderFor[T].deserializer),
child)
}
}

SparkSQL的上下文

SparkSQL的上下文通过SQLContext维护,它由一个SparkSession持有,并指向其所有者,以及所有者维护的SparkContext。在Spark 2.0之后,大部分SparkSQL的逻辑工作被迁移到了SparkSession中,所以这个类可以被看做是一个兼容性的封装。

1
2
3
4
5
class SQLContext private[sql](val sparkSession: SparkSession)
extends Logging with Serializable {
private[sql] def sessionState: SessionState = sparkSession.sessionState
private[sql] def sharedState: SharedState = sparkSession.sharedState
private[sql] def conf: SQLConf = sessionState.conf

SparkSQL的解析流程

首先会对SQL进行Parse,得到一个Unresolved LogicalPlan。这里Unresolved的意思是诸如变量名和表名这些东西是不确定的,需要在Analyzer的阶段借助于Catalog来决议得到LogicalPlan。Catalog就是描述了SQLContext里面的诸如表之类的对象。
通过Analyzer,将Unresolved LogicalPlan决议为Logical Plan。
Logical Plan继承了QueryPlan[LogicalPlan],是一棵AST。SparkSQL的执行目标就是树根的值,在计算过程中,父节点的计算依赖于子节点的计算结果。LogicalPlan又拥有三个子类BinaryNode/UnaryNodeLeafNode,然后有产生了OrderPreservingUnaryNode等子类。这些Node被另一些子类所继承,这些basicLogicalOperators描述了包括Project/Filter/Sample/Union/Join/Limit等操作
通过Optimizer,对Logical Plan进行优化。Catalyst主要做的是RBO,但诸如华为等公司和机构也有提出过CBO的方案。
优化后的Optimized Logical Plan会生成为Physical Plan。Physical Plan在Spark中的对应代码实现是SparkPlan,他同样继承了QueryPlan[SparkPlan]

Spark SQL的执行流程

和RDD一样,Dataset同样只在Action操作才会计算,我们选取最典型的count()来研究。可以看到,count操作实际上会执行plan.executeCollect(),而这里的plan是一个SparkPlanqe是一个QueryExecution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Dataset.scala
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}

private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
SQLExecution.withNewExecutionId(sparkSession, qe, Some(name)) {
qe.executedPlan.foreach { plan =>
plan.resetMetrics()
}
action(qe.executedPlan)
}
}

private def withNewExecutionId[U](body: => U): U = {
SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body)
}

QueryExecution用来描述整个SQL执行的上下文,从如下示例中可以看出,它维护了从Unsolved Logical Plan到Physical Plan的整个转换流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
scala> val ds = Seq(Person("Calvin", 22, 1)).toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint ... 1 more field]

scala> val fds = ds.filter(p => p.age>1)
fds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint ... 1 more field]

scala> ds.queryExecution
res9: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
LocalRelation [name#3, age#4L, money#5L]

== Analyzed Logical Plan ==
name: string, age: bigint, money: bigint
LocalRelation [name#3, age#4L, money#5L]

== Optimized Logical Plan ==
LocalRelation [name#3, age#4L, money#5L]

== Physical Plan ==
LocalTableScan [name#3, age#4L, money#5L]
```Scala
那么,`count()`做的就是对`qe`做一些手脚,然后调用`qe.executedPlan.executeCollect().head.getLong(0)`。于是我们查看`executeCollect()`这个方法,他实际上就是execute和collect两部分。execute部分实际上是对`getByteArrayRdd`的一个调用。而`collect`部分就是调用`byteArrayRdd.collect()`,从而得到一个`ArrayBuffer[InternalRow]`
```Scala
// SparkPlan.scala
def executeCollect(): Array[InternalRow] = {
// byteArrayRdd是一个RDD[(Long, Array[Byte])]
val byteArrayRdd = getByteArrayRdd()

val results = ArrayBuffer[InternalRow]()
byteArrayRdd.collect().foreach { countAndBytes =>
decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
}
results.toArray
}

getByteArrayRdd的作用是将一系列UnsafeRow打包成一个Array[Byte]以方便序列化,这个Array[Byte]的结构是[size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private def getByteArrayRdd(n: Int = -1): RDD[(Long, Array[Byte])] = {
execute().mapPartitionsInternal { iter =>
var count = 0
val buffer = new Array[Byte](4 << 10) // 4K
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(codec.compressedOutputStream(bos))
// `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is
// not hit.
while ((n < 0 || count < n) && iter.hasNext) {
val row = iter.next().asInstanceOf[UnsafeRow]
out.writeInt(row.getSizeInBytes)
row.writeToStream(out, buffer)
count += 1
}
out.writeInt(-1)
out.flush()
out.close()
Iterator((count, bos.toByteArray))
}
}

可以看到getByteArrayRdd中调用了execute方法,继而调用doExecute,得到一个RDD[InternalRow]

1
2
3
4
5
6
7
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw new IllegalStateException("A canonicalized plan is not supposed to be executed.")
}
doExecute()
}
protected def doExecute(): RDD[InternalRow]

由于SparkPlan是一个抽象类,所以这里的doExecute()没有看到实现,具体的实现根据其操作对象的不同分布在objects.scala上。

Row

Row是SparkSQL的基石。它实际上是一个trait,我们经常使用是它的子类GenericRowGenericRowWithSchema,而Row的内部实现则是InternalRowGenericRow是Rowapply创建时的默认构造。它没有schema。在GenericRowWithSchema中重新实现了filedIndex这个函数,允许我们使用row.getAsString这样的方法。如果经常使用SparkSQL的API会发现我们不能从一个DataFrame通过map到一个Row的方式得到另一个DataFrame,反而可以从一个Seq得到,其原因就是因为DataFrame有schema而Row没有。我们通过下面的实验来检查从一个Seq`到DataFrame的转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.functions.{concat, lit, udf}
import org.apache.spark.sql.{Row}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
case class Person(name: String, age: Long, money: Long)
import spark.implicits._

val ds = Seq(Person("Calvin", 22, 1)).toDS
val ds2 = Seq(Person("Neo", 23, 1)).toDS
val dfj = ds.union(ds2)
val dsj_fail = dfj.toDS // 注意DataFrame没有toDS方法,toDS是由RDD转DS用的
val dsj = dfj.as[Person]

val ds3 = Seq(("Calvin", 22, 1)).toDS
val ds4 = Seq(("Neo", 23, 1)).toDS
val dfj2 = ds3.union(ds4)

有关RowGenericRowWithSchema之间的转换,我们可以进行下面的实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 复用之前的头部
val df = Seq(Person("Calvin", 22, 100), Person("Neo", 33, 300)).toDF

// df的schema
scala> df.schema
res2: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,LongType,false), StructField(money,LongType,false))

// 第1行的schema
scala> df.take(1)(0).schema
res3: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,LongType,false), StructField(money,LongType,false))

// 查看type,发现是GenericRowWithSchema而不是Row
scala> df.take(1)(0).getClass.getSimpleName
res5: String = GenericRowWithSchema

// 增加一列
scala> val r = Row.unapplySeq(df.take(1)(0)).get.toArray ++ Seq("SZ")
r: Array[Any] = Array(Calvin, 22, 100, SZ)

// 对应增加一列schema
scala> val nsch = df.take(1)(0).schema.add(StructField("addr",StringType,true))
nsch: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,LongType,false), StructField(money,LongType,false), StructField(addr,StringType,true))

// 创建一个新SchemaRow
scala> val row_sch = new GenericRowWithSchema(r, nsch)
row_sch: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema = [Calvin,22,100,SZ]

只有GenericRowWithSchema有,因此我们可以创建一个GenericRowWithSchema,其实现在org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}

从上文中可以看到,DataFrame中的数据依旧是按照行组织的,通过外挂了一个schema,我们能够有效地识别列。在这种情况下对行的改动是容易的,但是如何对列进行改动呢?一般有两种办法

借助于withColumn

1
2
3
4
val df = Seq(Person("Calvin", 22, 100), Person("Neo", 33, 300)).toDF
// 通过cast函数进行类型转换,concat函数进行字符串连接
df.withColumn("name2", concat($"name", $"age".cast(StringType))).show()
df.withColumn("name2", $"age"+$"money").show()

当然,在$表达式之外,我们还可以使用udf,甚至带条件地进行withColumn

1
2
3
// 除了$表达式,还可以使用udf
val addMoneyUDF = udf((age: Long, money: Long) => age + money)
df.withColumn("name2", addMoneyUDF($"age", $"money"))

特别需要注意的是withColumn是存在性能开销的。如果我们在代码里频繁(例如使用一个for循环)withColumn,那么就可能出现一个Job结束,而下一个Job迟迟不开始的情况。如果我们将日志等级设置为TRACE,可以看到代码中也存在了很多batch resolution的情况。这是因为较深层次的依赖会导致SparkSQL不能分清到底需要缓存哪些数据以用来恢复,因此只能全部缓存。另外文章中还表示会造成大量的analysis开销。此外,伴随着withColumn的是UDF或者UDAF的使用,在Spark the definitive一书中指出,这类的算子容易导致OOM等问题。

groupBy和groupByKey

不同于RDD的相关方法,DataFrame系列的groupBygroupByKey会返回两个不同的类型RelationalGroupedDatasetKeyValueGroupedDataset。一般来说,虽然groupByKey更为灵活,能够生成自定义的key用来group,但KeyValueGroupedDataset只提供相对较少的操作,所以最好还是使用groupby。另外,在group操作之后就没有诸如union的操作,我们需要再显式map回DataFrame

Spark性能调优

一般来说,Spark可能出现瓶颈的地方有内存、网络和CPU,对于上面的这些问题,宜分为Driver和Executor两块进行考虑
内存方面的向硬盘的溢写、从gc.log中看到的GC的猛增、节点的未响应和OOM。
网络问题的主要场景是诸如Shuffle类的操作涉及在多个节点上传输,节点之间Connection reset by peer。

Spark常见性能问题和选项

诊断 现象 解决方案
Executor内存不足 Driver端ExecutorLostFailure,Executor端gc.log显示大量GC和FullGC 需要考虑Shuffle Read数据过大,或者数据倾斜。对于前者,可以考虑增加分区数或者换个Partitioner,增加Executor内存,增加Executor数量,减少Executor上的Task并行度,提前Filter,使用序列化。
Executor内存不足 Local Bytes Read+Remote Bytes Read很大 考虑是Shuffle Read的问题,同上。需要注意的是当使用groupBy系列算子时,可能一个KV对就很大的,所以增加Executor内存会更保险
Driver内存不足 Driver端gc.log显示大量GC和FullGC,spark.log中DAGScheduler相关log显示collect算子耗时过长 考虑增大Driver内存,避免collect大量数据
Driver内存不足 Driver端gc.log显示大量GC和FullGC 减少UDF的使用,减少诸如withColumn的使用
Driver内存不足 Driver端gc.log显示大量GC和FullGC,Driver的spark.log中出现大量BlockManagerInfo: Added broadcast,并且剩余内存较少,Executor的spark.log中出现TorrentBroadcast: Reading broadcast事件且耗时过长 减少broadcast的数据量
数据倾斜 少数Task耗时显著高于平均值 考虑换个Partitioner,扩大spark.shuffle.file.bufferspark.reducer.maxSizeInFlightspark.shuffle.memoryFraction,打开spark.shuffle.consolidateFiles

替代性算子

为了避免由于Shuffle操作导致的性能问题,常用的解决方案是使用map-side-combine的算子。这个思路就是先将聚合操作下推到每个节点本地,再将每个节点上的聚合结果拉到同一节点上进行聚合,这样能够显著减少通信量。这种方法的常见实践就是采用如下所示的一些替代性算子:

原算子 替代算子 备注
groupByKey reduceByKey/aggregateByKey
reduceByKey aggregateByKey
foreach foreachPartitions
filter filter+coalesce
repartition+sort repartitionAndSortWithinPartitions
repartition coalesce 如果目标分区数量小于当前分区数量

有关Persist的优化方案

根据RDD Programming Guide,虽然Spark会自动做persist,但是对于肯定需要复用的数据,显式persist并没有坏处。这里需要注意的是我们要尽量提高RDD的复用程度。
一般来说,如果内存中能够全部放下对象,选择默认的MEMORY_ONLY级别能够最大程度利用CPU,否则就需要考虑使用序列化的MEMORY_ONLY_SER存储。当内存再不够时,就需要考虑将其持久化到磁盘上,但这会带来较高的时间代价。虽然在Spark的较新版本中,通过Unsafe Shuffle可以直接对序列化之后的对象进行sort shuffle,但这不是通用的。

Event log

Spark会记录Event log,并在History Server或者Spark UI中供访问调试使用。

HistoryServer

Spark提供了History Server以保存Event Log,以便追踪历史任务的性能。History Server部署在18080,可以使用WebUI,也可以使用18080的/api/vi/application的api来请求json版本。
这种方式需要在运行前手动export SPARK_MASTER_HOST=localhost(会被诸如start-master.sh等文件访问修改)或者sh ./sbin/start-master.sh -h localhost && ./sbin/start-slave.sh spark://localhost:7077可以通过-h指定localhost。不然可能Slave会连不上localhost,因为他会访问你的电脑名字,例如CALVINNEO-MB0:7077而不是localhost。
spark-defaults.conf中,有关Event Log的配置项有两种,一个是在HDFS上,一个是在硬盘上。

1
2
3
4
5
# 硬盘
spark.eventLog.enabled true
spark.eventLog.dir hdfs://localhost:9000/user/spark/appHist
# HDFS
spark.history.fs.logDirectory .../spark-2.4.4-bin-hadoop2.7/conf/history/spark-events

这个在磁盘上,供给History Server用,但是实际上和HDFS的内容是一样的。需要注意的是,一旦spark.eventLog.enabled被设置为True,就需要保证9000是可以访问的,不然可能会报错。

spark log

gc log

常用调试方法

  1. 查看RDD的分区数

    1
    rdd.partitions.size
  2. 查看RDD的logical plan

    1
    rdd.toDebugString
  3. 查看queryExecution

    1
    d.queryExecution
  4. 查看schema

    1
    d.printSchema

Spark常见错误

变量在节点之间共享

当我们需要在节点间共享变量,例如将某个字符串从Driver发送到Executor上时,需要这个变量能够被序列化。特别地,有一个经典的Bug就是Map#mapValues不能被序列化,这个解决方案是在mapValues之后再map(identity)一下。
特别需要注意的是因为RDD是分布式存储的,所以不能够直接当做变量处理,例如下面的代码是不能够使用的。对于这种情况,要么是将其中一个小RDD广播,要不就是将两个RDD去做个JOIN。在SparkSQL中,JOIN操作会被视情况优化为广播。

1
2
3
rdd1.map{
rdd2.filter(...)
}

scala.collection.mutable.WrappedArray$ofRef cannot be cast to Integer

根据SoF,这个错误就是把Array改成Seq就好了。

Extracting Seq[(String,String,String)] from spark DataFrame

根据SoF

Spark的其他组件的简介

GraphX

GraphX是基于Spark实现的一个图计算框架,能够对图进行建模。GraphX内置了一些实用的方法,如PageRank、SCC等,同时也提供了Pregel算法的API,我们可以利用Pregel来实现自己的一些图算法。目前GraphX似乎还没有实用的Python API,比较方便的是借助Scala。

ML和MlLib

Streaming

Reference

  1. https://zhuanlan.zhihu.com/p/67068559
  2. http://www.jasongj.com/spark/rbo/
  3. https://www.kancloud.cn/kancloud/spark-internals/45243
  4. https://www.jianshu.com/p/4c5c2e535da5
  5. http://jerryshao.me/2014/01/04/spark-shuffle-detail-investigation/