在Yarn/MRV2/Spark里都有任务失败再重试的设置,Yarn作为资源管理框架,而MRV2/Spark作为计算框架,它们的失败重试有什么不同? 有哪些参数来控制失败重试? Spark on Yarn的时候参数又有什么影响?
Yarn的AM失败重试
Yarn作为资源管理框架,由RM负责AM(ApplicationMaster),具体的任务是由AM自己负责,所以Yarn对于一个Job的重试是在AM层级上的,其参数为 yarn.resourcemanager.am.max-attempts
或 yarn.resourcemanager.am.max-retries
,默认值为2,即如果一个Job的AM死掉了,RM会重新分配container重启AM一次,而对于container的挂掉,则由具体的AppMaster实现来管理,该参数判断代码如下:
// RMAppImpl.java public RMAppState transition(RMAppImpl app, RMAppEvent event) { int numberOfFailure = app.getNumFailedAppAttempts(); // other code... if (numberOfFailure >= app.maxAppAttempts) { app.isNumAttemptsBeyondThreshold = true; } app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED); }
注意:如果一个Job失败了,可不一定会触发这个重试,Job失败并不代表其AM失败,Job失败的原因是有多种的
MRV2的Task失败重试
对于AM的失败次数,Yarn提供了用户设置参数来单独控制每个任务,可以覆盖Yarn的默认参数值,其中在MRV2里通过参数 mapreduce.am.max-attempts
体现,该参数的默认值也为2,AM的失败次数由它和 yarn.resourcemanager.am.max-attempts
一起决定,判断逻辑如下:
// RMAppImpl.java // yarn.resourcemanager.am.max-attempts int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); // mapreduce.am.max-attempts 或Spark里的 spark.yarn.maxAppAttempts int individualMaxAppAttempts = submissionContext.getMaxAppAttempts(); if (individualMaxAppAttempts <= 0 || individualMaxAppAttempts > globalMaxAppAttempts) { this.maxAppAttempts = globalMaxAppAttempts; LOG.warn("The specific max attempts: " + individualMaxAppAttempts + " for application: " + applicationId.getId() + " is invalid, because it is out of the range [1, " + globalMaxAppAttempts + "]. Use the global max attempts instead."); } else { this.maxAppAttempts = individualMaxAppAttempts; }
即在RMV2或Spark里的用户设置值在0到 yarn.resourcemanager.am.max-attempts
之间,那么取用户的设置值,如果不在,那么取 yarn.resourcemanager.am.max-attempts
设置值或默认值
由于AM控制了每个Job的运行,而Job由Map Task和Reduce Task组成,因此Job的失败就与Task相关,在MRV2里,提供了 mapreduce.map.maxattempts
和 mapreduce.reduce.maxattempts
两个值来控制MR Task的最大失败次数,两个参数的默认值都为4,但是在Uber模式的时候,两个参数的值被设为了1
// ReduceTaskImpl.java protected int getMaxAttempts() { return conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4); } // MapTaskImpl.java protected int getMaxAttempts() { return conf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4); }
这两个参数的意思是,单个map task或reduce task的最大尝试次数是4,如果一个task尝试了4次还未成功,那么该Task就是失败的,从而整个Job也是失败的,这时由于AM并没有问题,所以不会引起Yarn对Job的重试
同时这两个参数是针对单个task的,并不是所有task的尝试次数总和,所以如果多个task都有失败,只要每个task的尝试次数不超过4次,Job就不是失败的,所以有时你看到一个Job有几十次或上百次失败,Job最后也是运行成功的!如下:
// TaskImpl.java // TaskImpl是一个abstract class,Map和Reduce有不同的实现,代表的是单个Task,所以这里判断的是单个Task的尝试次数 if (attemptState == TaskAttemptState.FAILED) { failedAttempts.add(attempt.getID()); if (failedAttempts.size() >= maxAttempts) { taces = TaskAttemptCompletionEventStatus.TIPFAILED; } }
Spark on Yarn
对于Spark on Yarn,yarn只负责启动和管理AM以及分配资源,Spark有自己的AM实现,当Executor运行起来后,任务的控制是由Driver负责的,所以在重试上Yarn只负责AM的重试,没有重试的参数冲突
同MRV2一样,Spark可以使用 spark.yarn.maxAppAttempts
参数控制AM的尝试次数,该参数没有默认值,如果不设置则保持Yarn的设置,如果有设置,则与MRV2的 mapreduce.am.max-attempts
参数判断逻辑一致
其次,在Spark对ApplicationMaster的实现里,Spark提供了参数 spark.yarn.max.executor.failures
来控制Executor的失败次数,其默认值是 numExecutors * 2
(如果dynamicallocation打开了,那么该值为最大的Executors个数乘2),同时其最小值不小于3。当Executor的失败次数达到这个值的时候,整个Spark Job(这里的Job是整个Spark任务,而不是DAG里的Job/Stage/Task)就失败了,判断逻辑如下:
// ApplicationMaster.scala private val maxNumExecutorFailures = { val effectiveNumExecutors = if (Utils.isDynamicAllocationEnabled(sparkConf)) { sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) } else { sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0) } // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need // avoid the integer overflow here. val defaultMaxNumExecutorFailures = math.max(3, if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors)) } sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures) // other code ... // judge if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, s"Max number of executor failures ($maxNumExecutorFailures) reached") } else { // ... }
对于Executor失败的原因,可能是OOM,也可能是心跳超时等等,Task的失败并不一定能导致Executor的失败
对于Task的失败,Spark还提供了参数 spark.task.maxFailures
来控制task的失败次数,其默认值是4,同一个Task失败的次数不能超过4次,否则Spark Job(Job是Spark任务,非DAG里的Job)就失败了。此参数无法限制Task总的失败次数,如果有多个Task失败,只要每个Task的失败次数不超过4次,Spark Job就是成功的!如下:
// TaskSetManager.scala // numFailures是一个数组,大小为numTasks,存的是每个task的失败次数 if (numFailures(index) >= maxTaskFailures) { logError("Task %d in stage %s failed %d times; aborting job".format( index, taskSet.id, maxTaskFailures)) abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:" .format(index, taskSet.id, maxTaskFailures, failureReason), failureException) return }
一个Spark app根据DAGScheduler可分为多个Job,Stage或Task,但是任务的重试次数跟Job或Stage无关
参数总结
参数 | 默认值 | 备注 | 设置位置 |
yarn.resourcemanager.am.max-attempts(yarn.resourcemanager.am.max-retries) | 2 | 控制AppMaster的重试 | Yarn的RM |
mapreduce.am.max-attempts | 2 | 覆盖Yarn的默认AppMaster的重试次数 | MRV2 App or hive |
mapreduce.map.maxattempts | 4 | 控制单个map任务的重试次数 | MRV2 App or hive |
mapreduce.reduce.maxattempts | 4 | 控制单个reduce任务的重试次数 | MRV2 App or hive |
spark.yarn.maxAppAttempts | none | 覆盖Yarn的默认AppMaster的重试次数 | Spark on Yarn App |
spark.yarn.max.executor.failures | numExecutors * 2 (twice the maximum number of executors if dynamicallocation is enabled), with a minimum of 3 | 控制Spark executor的失败重试次数 | Spark on Yarn App |
spark.task.maxFailures | 4 | 控制Spark单个task的最大失败次数 | Spark on Yarn App or Spark standalone App |
以上代码基于Hadoop 2.6.0和Spark 2.0.0
欢迎阅读转载,转载请注明出处:https://my.oschina.net/kavn/blog/1543769