从源码角度对比Yarn下的App对任务失败的控制


声明:本文转载自https://my.oschina.net/kavn/blog/1543769,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

在Yarn/MRV2/Spark里都有任务失败再重试的设置,Yarn作为资源管理框架,而MRV2/Spark作为计算框架,它们的失败重试有什么不同? 有哪些参数来控制失败重试? Spark on Yarn的时候参数又有什么影响?

Yarn的AM失败重试

Yarn作为资源管理框架,由RM负责AM(ApplicationMaster),具体的任务是由AM自己负责,所以Yarn对于一个Job的重试是在AM层级上的,其参数为 yarn.resourcemanager.am.max-attemptsyarn.resourcemanager.am.max-retries,默认值为2,即如果一个Job的AM死掉了,RM会重新分配container重启AM一次,而对于container的挂掉,则由具体的AppMaster实现来管理,该参数判断代码如下:

// RMAppImpl.java  public RMAppState transition(RMAppImpl app, RMAppEvent event) {   int numberOfFailure = app.getNumFailedAppAttempts();   // other code...   if (numberOfFailure >= app.maxAppAttempts) {     app.isNumAttemptsBeyondThreshold = true;   }   app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED); } 

注意:如果一个Job失败了,可不一定会触发这个重试,Job失败并不代表其AM失败,Job失败的原因是有多种的

MRV2的Task失败重试

对于AM的失败次数,Yarn提供了用户设置参数来单独控制每个任务,可以覆盖Yarn的默认参数值,其中在MRV2里通过参数 mapreduce.am.max-attempts 体现,该参数的默认值也为2,AM的失败次数由它和 yarn.resourcemanager.am.max-attempts 一起决定,判断逻辑如下:

// RMAppImpl.java  // yarn.resourcemanager.am.max-attempts int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); // mapreduce.am.max-attempts 或Spark里的 spark.yarn.maxAppAttempts int individualMaxAppAttempts = submissionContext.getMaxAppAttempts(); if (individualMaxAppAttempts <= 0 || individualMaxAppAttempts > globalMaxAppAttempts) {   this.maxAppAttempts = globalMaxAppAttempts;   LOG.warn("The specific max attempts: " + individualMaxAppAttempts           + " for application: " + applicationId.getId()           + " is invalid, because it is out of the range [1, "           + globalMaxAppAttempts + "]. Use the global max attempts instead."); } else {   this.maxAppAttempts = individualMaxAppAttempts; } 

即在RMV2或Spark里的用户设置值在0到 yarn.resourcemanager.am.max-attempts 之间,那么取用户的设置值,如果不在,那么取 yarn.resourcemanager.am.max-attempts 设置值或默认值

由于AM控制了每个Job的运行,而Job由Map Task和Reduce Task组成,因此Job的失败就与Task相关,在MRV2里,提供了 mapreduce.map.maxattemptsmapreduce.reduce.maxattempts 两个值来控制MR Task的最大失败次数,两个参数的默认值都为4,但是在Uber模式的时候,两个参数的值被设为了1

// ReduceTaskImpl.java protected int getMaxAttempts() {   return conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4); }  // MapTaskImpl.java protected int getMaxAttempts() {   return conf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4); } 

这两个参数的意思是,单个map task或reduce task的最大尝试次数是4,如果一个task尝试了4次还未成功,那么该Task就是失败的,从而整个Job也是失败的,这时由于AM并没有问题,所以不会引起Yarn对Job的重试

同时这两个参数是针对单个task的,并不是所有task的尝试次数总和,所以如果多个task都有失败,只要每个task的尝试次数不超过4次,Job就不是失败的,所以有时你看到一个Job有几十次或上百次失败,Job最后也是运行成功的!如下:

// TaskImpl.java  // TaskImpl是一个abstract class,Map和Reduce有不同的实现,代表的是单个Task,所以这里判断的是单个Task的尝试次数 if (attemptState == TaskAttemptState.FAILED) {   failedAttempts.add(attempt.getID());   if (failedAttempts.size() >= maxAttempts) {     taces = TaskAttemptCompletionEventStatus.TIPFAILED;   } } 

Spark on Yarn

对于Spark on Yarn,yarn只负责启动和管理AM以及分配资源,Spark有自己的AM实现,当Executor运行起来后,任务的控制是由Driver负责的,所以在重试上Yarn只负责AM的重试,没有重试的参数冲突

同MRV2一样,Spark可以使用 spark.yarn.maxAppAttempts 参数控制AM的尝试次数,该参数没有默认值,如果不设置则保持Yarn的设置,如果有设置,则与MRV2的 mapreduce.am.max-attempts 参数判断逻辑一致

其次,在Spark对ApplicationMaster的实现里,Spark提供了参数 spark.yarn.max.executor.failures 来控制Executor的失败次数,其默认值是 numExecutors * 2(如果dynamicallocation打开了,那么该值为最大的Executors个数乘2),同时其最小值不小于3。当Executor的失败次数达到这个值的时候,整个Spark Job(这里的Job是整个Spark任务,而不是DAG里的Job/Stage/Task)就失败了,判断逻辑如下:

// ApplicationMaster.scala  private val maxNumExecutorFailures = {   val effectiveNumExecutors =     if (Utils.isDynamicAllocationEnabled(sparkConf)) {       sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)     } else {       sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)     }   // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need   // avoid the integer overflow here.   val defaultMaxNumExecutorFailures = math.max(3,     if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors)) }   sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)  // other code ...  // judge if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {   finish(FinalApplicationStatus.FAILED,     ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,     s"Max number of executor failures ($maxNumExecutorFailures) reached") } else {   // ... } 

对于Executor失败的原因,可能是OOM,也可能是心跳超时等等,Task的失败并不一定能导致Executor的失败

对于Task的失败,Spark还提供了参数 spark.task.maxFailures 来控制task的失败次数,其默认值是4,同一个Task失败的次数不能超过4次,否则Spark Job(Job是Spark任务,非DAG里的Job)就失败了。此参数无法限制Task总的失败次数,如果有多个Task失败,只要每个Task的失败次数不超过4次,Spark Job就是成功的!如下:

// TaskSetManager.scala  // numFailures是一个数组,大小为numTasks,存的是每个task的失败次数 if (numFailures(index) >= maxTaskFailures) {   logError("Task %d in stage %s failed %d times; aborting job".format(     index, taskSet.id, maxTaskFailures))   abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"     .format(index, taskSet.id, maxTaskFailures, failureReason), failureException)   return } 

一个Spark app根据DAGScheduler可分为多个Job,Stage或Task,但是任务的重试次数跟Job或Stage无关

参数总结

参数默认值备注设置位置
yarn.resourcemanager.am.max-attempts(yarn.resourcemanager.am.max-retries)2控制AppMaster的重试Yarn的RM
mapreduce.am.max-attempts2覆盖Yarn的默认AppMaster的重试次数MRV2 App or hive
mapreduce.map.maxattempts4控制单个map任务的重试次数MRV2 App or hive
mapreduce.reduce.maxattempts4控制单个reduce任务的重试次数MRV2 App or hive
spark.yarn.maxAppAttemptsnone覆盖Yarn的默认AppMaster的重试次数Spark on Yarn App
spark.yarn.max.executor.failuresnumExecutors * 2 (twice the maximum number of executors if dynamicallocation is enabled), with a minimum of 3控制Spark executor的失败重试次数Spark on Yarn App
spark.task.maxFailures4控制Spark单个task的最大失败次数Spark on Yarn App or Spark standalone App

以上代码基于Hadoop 2.6.0和Spark 2.0.0

欢迎阅读转载,转载请注明出处:https://my.oschina.net/kavn/blog/1543769

本文发表于2017年09月26日 20:34
(c)注:本文转载自https://my.oschina.net/kavn/blog/1543769,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2370 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1