Spark源码阅读——任务提交过程


声明:本文转载自https://my.oschina.net/nalenwind/blog/1786172,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

插播一条广告→2021 ByteDance字节跳动内推←各城市、各方向的岗位都有,大量招人!


Spark 源码阅读——任务提交过程


当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者是count这类action来触发作业的提交,所以,当我们查看这些方法的源码时,发现底层都调用了SparkContext的runJob方法,而SparkContext的runJob方法又调用的DAGScheduler的runJob方法:

def runJob[T, U: ClassTag](   rdd: RDD[T],   func: (TaskContext, Iterator[T]) => U,   partitions: Seq[Int],     resultHandler: (Int, U) => Unit): Unit = {   if (stopped.get()) {     throw new IllegalStateException("SparkContext has been shutdown")   }   val callSite = getCallSite   val cleanedFunc = clean(func)   logInfo("Starting job: " + callSite.shortForm)   if (conf.getBoolean("spark.logLineage", false)) {     logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)   }   dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, esultHandler, localProperties.get)   progressBar.foreach(_.finishAll())   rdd.doCheckpoint() } 

这里以rdd和分区信息和对结果集处理的回调函数为参数进入到:

  def runJob[T, U](       rdd: RDD[T],       func: (TaskContext, Iterator[T]) => U,       partitions: Seq[Int],       callSite: CallSite,       resultHandler: (Int, U) => Unit,       properties: Properties): Unit = {     val start = System.nanoTime     val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)     // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,     // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that     // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's     // safe to pass in null here. For more detail, see SPARK-13747.     val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]     waiter.completionFuture.ready(Duration.Inf)(awaitPermission)     waiter.completionFuture.value.get match {       case scala.util.Success(_) =>         logInfo("Job %d finished: %s, took %f s".format           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))       case scala.util.Failure(exception) =>         logInfo("Job %d failed: %s, took %f s".format           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))         // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.         val callerStackTrace = Thread.currentThread().getStackTrace.tail         exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)         throw exception     }   } 

然后调用submitJob方法:

  def submitJob[T, U](       rdd: RDD[T],       func: (TaskContext, Iterator[T]) => U,       partitions: Seq[Int],       callSite: CallSite,       resultHandler: (Int, U) => Unit,       properties: Properties): JobWaiter[U] = {     // Check to make sure we are not launching a task on a partition that does not exist.     val maxPartitions = rdd.partitions.length     partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>       throw new IllegalArgumentException(         "Attempting to access a non-existent partition: " + p + ". " +           "Total number of partitions: " + maxPartitions)     }      val jobId = nextJobId.getAndIncrement()     if (partitions.size == 0) {       // Return immediately if the job is running 0 tasks       return new JobWaiter[U](this, jobId, 0, resultHandler)     }      assert(partitions.size > 0)     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)     eventProcessLoop.post(JobSubmitted(       jobId, rdd, func2, partitions.toArray, callSite, waiter,       SerializationUtils.clone(properties)))     waiter   } 

我们注意到里面有一行eventProcessLoop.post(JobSubmitted(...))的代码,这是向消息队列中放入一个作业提交的消息,由另一个线程来循环从队列中取出消息消费,执行相应的逻辑。我们可以看到在DAGScheduler类定义的最后一行调用了eventProcessLoop.start方法来启动这个时间循环线程。 在另一个线程中,通过scala的case class模式匹配并执行了DAGScheduler的handleJobSubmitted方法,这是一个比较核心的方法,所有生成stage,以及stage之间的依赖关系解析,作业的生成,都是在这里完成的。

private[scheduler] def handleJobSubmitted(jobId: Int,       finalRDD: RDD[_],       func: (TaskContext, Iterator[_]) => _,       partitions: Array[Int],       callSite: CallSite,       listener: JobListener,       properties: Properties) {     var finalStage: ResultStage = null     try {       // New stage creation may throw an exception if, for example, jobs are run on a       // HadoopRDD whose underlying HDFS files have been deleted.       finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)     } catch {       case e: Exception =>         logWarning("Creating new stage failed due to exception - job: " + jobId, e)         listener.jobFailed(e)         return     }      val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)     clearCacheLocs()     logInfo("Got job %s (%s) with %d output partitions".format(       job.jobId, callSite.shortForm, partitions.length))     logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")     logInfo("Parents of final stage: " + finalStage.parents)     logInfo("Missing parents: " + getMissingParentStages(finalStage))      val jobSubmissionTime = clock.getTimeMillis()     jobIdToActiveJob(jobId) = job     activeJobs += job     finalStage.setActiveJob(job)     val stageIds = jobIdToStageIds(jobId).toArray     val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))     listenerBus.post(       SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))     submitStage(finalStage)      submitWaitingStages()   } 

首先是在

finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) 

这里将stage之间的依赖关系解析出来,同时根据依赖关系从小到大生成stage id。

  private def newResultStage(       rdd: RDD[_],       func: (TaskContext, Iterator[_]) => _,       partitions: Array[Int],       jobId: Int,       callSite: CallSite): ResultStage = {     val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)     val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)     stageIdToStage(id) = stage     updateJobIdStageIdMaps(jobId, stage)     stage   } 

getParentStagesAndId:

  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {     val parentStages = getParentStages(rdd, firstJobId)     val id = nextStageId.getAndIncrement()     (parentStages, id)   }      private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {     val parents = new HashSet[Stage]     val visited = new HashSet[RDD[_]]     // We are manually maintaining a stack here to prevent StackOverflowError     // caused by recursively visiting     val waitingForVisit = new Stack[RDD[_]]     def visit(r: RDD[_]) {       if (!visited(r)) {         visited += r         // Kind of ugly: need to register RDDs with the cache here since         // we can't do it in its constructor because # of partitions is unknown         for (dep <- r.dependencies) {           dep match {             case shufDep: ShuffleDependency[_, _, _] =>               parents += getShuffleMapStage(shufDep, firstJobId)             case _ =>               waitingForVisit.push(dep.rdd)           }         }       }     }     waitingForVisit.push(rdd)     while (waitingForVisit.nonEmpty) {       visit(waitingForVisit.pop())     }     parents.toList   } 

可以看到这里使用栈结构深度依次遍历了每一个rdd的所有依赖,如果是shuffle dependency则生成shuffle stage,其他的依赖则先放到栈里,再依次遍历。这里在生成shuffleMapStage的过程中又会递归的调用getParentStagesAndId方法,所以最后生成的finalStage是一个处于依赖树最顶端的包含其所有依赖的子依赖树的结构,stage id的生成从依赖链最底端,从小到大生成。

之后以finalStage为参数调用submitStage来提交作业,但是在提交的过程中,它会依次递归的解析和提交每个stage所依赖的父stage,最终最先提交的是没有任何依赖的stage。

  private def submitStage(stage: Stage) {     val jobId = activeJobForStage(stage)     if (jobId.isDefined) {       logDebug("submitStage(" + stage + ")")       if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {         val missing = getMissingParentStages(stage).sortBy(_.id)         logDebug("missing: " + missing)         if (missing.isEmpty) {           logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")           submitMissingTasks(stage, jobId.get)         } else {           for (parent <- missing) {             submitStage(parent)           }           waitingStages += stage         }       }     } else {       abortStage(stage, "No active job for stage " + stage.id, None)     }   } 

通过submitMissingTasks提交stage的所有task。在submitMissingTasks方法中, 首先计算task的分发策略,

val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {       stage match {         case s: ShuffleMapStage =>           partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap         case s: ResultStage =>           val job = s.activeJob.get           partitionsToCompute.map { id =>             val p = s.partitions(id)             (id, getPreferredLocs(stage.rdd, p))           }.toMap       }     } catch {       case NonFatal(e) =>         stage.makeNewStageAttempt(partitionsToCompute.size)         listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))         abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))         runningStages -= stage         return     } 

然后序列化task,

    var taskBinary: Broadcast[Array[Byte]] = null     try {       // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).       // For ResultTask, serialize and broadcast (rdd, func).       val taskBinaryBytes: Array[Byte] = stage match {         case stage: ShuffleMapStage =>           JavaUtils.bufferToArray(             closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))         case stage: ResultStage =>           JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))       } 

将序列化后的task广播出去,

  taskBinary = sc.broadcast(taskBinaryBytes) 

然后将tasks信息封装成task对象数组,

val tasks: Seq[Task[_]] = try {       stage match {         case stage: ShuffleMapStage =>           partitionsToCompute.map { id =>             val locs = taskIdToLocations(id)             val part = stage.rdd.partitions(id)             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,               taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)           }          case stage: ResultStage =>           val job = stage.activeJob.get           partitionsToCompute.map { id =>             val p: Int = stage.partitions(id)             val part = stage.rdd.partitions(p)             val locs = taskIdToLocations(id)             new ResultTask(stage.id, stage.latestInfo.attemptId,               taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)           }       }     } catch {       case NonFatal(e) =>         abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))         runningStages -= stage         return     } 

调用taskScheduler提交task集合

  taskScheduler.submitTasks(new TaskSet(         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) 

这个方法里主要是将taskSet交给TaskSetManager去管理,另外比较关键的是调用了schedulableBuilder中的addTaskSetManager,SchedulableBuilder本身是应用程序级别的调度器,它自己支持两种调度模式,一种是FIFO,另一种是FAIR,调度策略可以通过spark-env.sh中的spark.scheduler.mode进行具体的设置,默认情况下是FIFO。最后在submitTasks中调用了

  backend.reviveOffers() 

这里调用了CoarseGrainedSchedulerBackend.reviveOffers给driverEndpoint发送了一个ReviveOffers case object,这个消息其实是发给driverEndpoint自己的(详情见sparkde RpcEnv模块),也就是说最后处理的这个消息的还是driverEndpoint本身。这里会触发driverEndpoint的recieve方法然后路由到makeOffers方法。

    private def makeOffers() {       // Filter out executors under killing       val activeExecutors = executorDataMap.filterKeys(executorIsAlive)       val workOffers = activeExecutors.map { case (id, executorData) =>         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)       }.toSeq       launchTasks(scheduler.resourceOffers(workOffers))     } 

在makeOffers方法中,首先准备好所有可以用于计算的Executor,然后找出可以的workOffers(代表了所有可用ExecutorBackend中可以使用的CPU Cores信息)WorkerOffer会告我们具体Executor可用的资源。而确定task具体运行在哪个ExecutorBackend上的算法是有TaskSetManager的resourceOffers方法决定的,具体算法我们后续讨论。再通过调用launchTask把任务发送给ExecutorBackend去执行。代码如下:

    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {       for (task <- tasks.flatten) {         val serializedTask = ser.serialize(task)         if (serializedTask.limit >= maxRpcMessageSize) {           scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>             try {               var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +                 "spark.rpc.message.maxSize (%d bytes). Consider increasing " +                 "spark.rpc.message.maxSize or using broadcast variables for large values."               msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)               taskSetMgr.abort(msg)             } catch {               case e: Exception => logError("Exception in error callback", e)             }           }         }         else {           val executorData = executorDataMap(task.executorId)           executorData.freeCores -= scheduler.CPUS_PER_TASK            logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +             s"${executorData.executorHost}.")            executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))         }       }     } 

可以看到这里有将序列化后的task发送给executor的逻辑,所以整体的提交作业到这里就结束了。

本文发表于2018年03月28日 22:38
(c)注:本文转载自https://my.oschina.net/nalenwind/blog/1786172,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1226 讨论 0 喜欢 1

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1