从源码角度看Spark on yarn client & cluster模式的本质区别


声明:本文转载自https://my.oschina.net/kavn/blog/1540548,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

首先区分下AppMaster和Driver,任何一个yarn上运行的任务都必须有一个AppMaster,而任何一个Spark任务都会有一个Driver,Driver就是运行SparkContext(它会构建TaskScheduler和DAGScheduler)的进程,当然在Driver上你也可以做很多非Spark的事情,这些事情只会在Driver上面执行,而由SparkContext上牵引出来的代码则会由DAGScheduler分析,并形成Job和Stage交由TaskScheduler,再由TaskScheduler交由各Executor分布式执行。

所以Driver和AppMaster是两个完全不同的东西,Driver是控制Spark计算和任务资源的,而AppMaster是控制yarn app运行和任务资源的,只不过在Spark on Yarn上,这两者就出现了交叉,而在standalone模式下,资源则由Driver管理。在Spark on Yarn上,Driver会和AppMaster通信,资源的申请由AppMaster来完成,而任务的调度和执行则由Driver完成,Driver会通过与AppMaster通信来让Executor的执行具体的任务。

client与cluster的区别

对于yarn-client和yarn-cluster的唯一区别在于,yarn-client的Driver运行在本地,而AppMaster运行在yarn的一个节点上,他们之间进行远程通信,AppMaster只负责资源申请和释放(当然还有DelegationToken的刷新),然后等待Driver的完成;而yarn-cluster的Driver则运行在AppMaster所在的container里,Driver和AppMaster是同一个进程的两个不同线程,它们之间也会进行通信,AppMaster同样等待Driver的完成,从而释放资源。

Spark里AppMaster的实现:org.apache.spark.deploy.yarn.ApplicationMaster Yarn里MapReduce的AppMaster实现:org.apache.hadoop.mapreduce.v2.app.MRAppMaster

在yarn-client模式里,优先运行的是Driver(我们写的应用代码就是入口),然后在初始化SparkContext的时候,会作为client端向yarn申请AppMaster资源,当AppMaster运行后,它会向yarn注册自己并申请Executor资源,之后由本地Driver与其通信控制任务运行,而AppMaster则时刻监控Driver的运行情况,如果Driver完成或意外退出,AppMaster会释放资源并注销自己。所以在该模式下,如果运行spark-submit的程序退出了,整个任务也就退出了

在yarn-cluster模式里,本地进程则仅仅只是一个client,它会优先向yarn申请AppMaster资源运行AppMaster,在运行AppMaster的时候通过反射启动Driver(我们的应用代码),在SparkContext初始化成功后,再向yarn注册自己并申请Executor资源,此时Driver与AppMaster运行在同一个container里,是两个不同的线程,当Driver运行完毕,AppMaster会释放资源并注销自己。所以在该模式下,本地进程仅仅是一个client,如果结束了该进程,整个Spark任务也不会退出,因为Driver是在远程运行的

下面从源码的角度看看SparkSubmit的代码调用(基于Spark2.0.0):

代码公共部分

SparkSubmit#main =>

val appArgs = new SparkSubmitArguments(args) appArgs.action match {   // normal spark-submit   case SparkSubmitAction.SUBMIT => submit(appArgs)   // use --kill specified   case SparkSubmitAction.KILL => kill(appArgs)   // use --status specified   case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } 

SparkSubmit的main方法是在用户使用spark-submit脚本提交Spark app的时候调用的,可以看到正常情况下,它会调用SparkSubmit#submit方法

SparkSubmit#submit =>

val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) // 此处省略掉代理账户,异常处理,提交失败的重提交逻辑,只看主干代码 runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 

在submit方法内部,会先进行提交环境相关的处理,调用的是SparkSubmit#prepareSubmitEnvironment方法,之后利用拿到的mainClass等信息,再调用SparkSubmit#runMain方法来执行对于主函数

SparkSubmit#prepareSubmitEnvironment =>

主干相关的代码如下:

// yarn client mode if (deployMode == CLIENT) {   // client 模式下,运行的是 --class 后指定的mainClass,也即我们的代码   childMainClass = args.mainClass   if (isUserJar(args.primaryResource)) {     childClasspath += args.primaryResource   }   if (args.jars != null) { childClasspath ++= args.jars.split(",") }   if (args.childArgs != null) { childArgs ++= args.childArgs } }  // yarn cluster mode val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER if (isYarnCluster) {   // cluster 模式下,运行的是Client类   childMainClass = "org.apache.spark.deploy.yarn.Client"   if (args.isPython) {     childArgs += ("--primary-py-file", args.primaryResource)     childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")   } else if (args.isR) {     val mainFile = new Path(args.primaryResource).getName     childArgs += ("--primary-r-file", mainFile)     childArgs += ("--class", "org.apache.spark.deploy.RRunner")   } else {     if (args.primaryResource != SparkLauncher.NO_RESOURCE) {       childArgs += ("--jar", args.primaryResource)     }     // 这里 --class 指定的是AppMaster里启动的Driver,也即我们的代码     childArgs += ("--class", args.mainClass)   }   if (args.childArgs != null) {     args.childArgs.foreach { arg => childArgs += ("--arg", arg) }   } } 

在 prepareSubmitEnvironment 里,主要负责解析用户参数,设置环境变量env,处理python/R等依赖,然后针对不同的部署模式,匹配不同的运行主类,比如: yarn-client>args.mainClass,yarn-cluster>o.a.s.deploy.yarn.Client

SparkSubmit#runMain =>

骨干代码如下

try {   mainClass = Utils.classForName(childMainClass) } catch {   // ... } val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) try {   // childArgs就是用户自己传给Spark应用代码的参数   mainMethod.invoke(null, childArgs.toArray) } catch {   // ... } 

在runMain方法里,会设置ClassLoader,根据用户代码优先的设置(spark.driver.userClassPathFirst)来加载对应的类,然后反射调用prepareSubmitEnvironment方法返回的主类,并调用其main方法

从所反射的不同主类,我们来看看具体调用方式的不同:

对于yarn-cluster

o.a.s.deploy.yarn.Client#main =>

val sparkConf = new SparkConf  val args = new ClientArguments(argStrings) new Client(args, sparkConf).run() 

在Client伴生对象里构建了Client类的对象,然后调用了Client#run方法

o.a.s.deploy.yarn.Client#run =>

this.appId = submitApplication() // report application ... 

run方法核心的就是提交任务到yarn,其调用了Client#submitApplication方法,拿到提交完的appID后,监控app的状态

o.a.s.deploy.yarn.Client#submitApplication =>

try {   // 获取提交用户的Credentials,用于后面获取delegationToken   setupCredentials()   yarnClient.init(yarnConf)   yarnClient.start()    // Get a new application from our RM   val newApp = yarnClient.createApplication()   val newAppResponse = newApp.getNewApplicationResponse()   // 拿到appID   appId = newAppResponse.getApplicationId()   // 报告状态   reportLauncherState(SparkAppHandle.State.SUBMITTED)   launcherBackend.setAppId(appId.toString)    // Verify whether the cluster has enough resources for our AM   verifyClusterResources(newAppResponse)    // 创建AppMaster运行的context,为其准备运行环境,java options,以及需要运行的java命令,AppMaster通过该命令在yarn节点上启动   val containerContext = createContainerLaunchContext(newAppResponse)   val appContext = createApplicationSubmissionContext(newApp, containerContext)    // Finally, submit and monitor the application   logInfo(s"Submitting application $appId to ResourceManager")   yarnClient.submitApplication(appContext)   appId } catch {   case e: Throwable =>     if (appId != null) {       cleanupStagingDir(appId)     }     throw e } 

在 submitApplication 里完成了app的申请,AppMaster context的创建,最后完成了任务的提交,对于cluster模式而言,任务提交后本地进程就只是一个client而已,Driver就运行在与AppMaster同一container里,对于client模式而言,执行 submitApplication 方法时,Driver已经在本地运行,这一步就只是提交任务到yarn而已

o.a.s.deploy.yarn.Client#createContainerLaunchContext

createContainerLaunchContext方法的功能是创建AppMaster container context,在这里就会指定AppMaster里是否运行Driver,其主要代码如下:

val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) // 非pySpark时,pySparkArchives为Nil val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) // 这一步会进行delegationtoken的获取,存于Credentials,在AppMasterContainer构建完的最后将其存入到context里 val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)  val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) // 设置AppMaster container运行的资源和环境 amContainer.setLocalResources(localResources.asJava) amContainer.setEnvironment(launchEnv.asJava) // 设置JVM参数 val javaOpts = ListBuffer[String]() javaOpts += "-Djava.io.tmpdir=" + tmpDir // other java opts setting...  // 对于cluster模式,通过 --class 指定AppMaster运行我们的Driver端,对于client模式则纯作为资源申请和分配的工具 val userClass =   if (isClusterMode) {     Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))   } else {     Nil   } // 设置AppMaster运行的主类 val amClass =   if (isClusterMode) {     Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName   } else {     // ExecutorLauncher只是ApplicationMaster的一个warpper     Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName   }  val amArgs =   Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++     userArgs ++ Seq(       "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),         LOCALIZED_CONF_DIR, SPARK_CONF_FILE))  // Command for the ApplicationMaster val commands = prefixEnv ++ Seq(     YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"   ) ++   javaOpts ++ amArgs ++   Seq(     "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",     "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")  val printableCommands = commands.map(s => if (s == null) "null" else s).toList // 设置需运行的命令 amContainer.setCommands(printableCommands.asJava)  val securityManager = new SecurityManager(sparkConf) // 设置应用权限 amContainer.setApplicationACLs(       YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) // 设置delegationToken setupSecurityToken(amContainer) 

对于yarn-client

args.mainClass =>

在我们的Spark代码里,需要创建一个SparkContext来执行Spark任务,而在其构造器里创建TaskScheduler的时候,对于client模式就会向yarn申请资源提交任务,如下

// 调用createTaskScheduler方法,对于yarn模式,master=="yarn" val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts // 创建DAGScheduler _dagScheduler = new DAGScheduler(this) 

SparkContext#createTaskScheduler =>

这里会根据master匹配不同模式,比如local/standalone/yarn,在yarn模式下会利用ServiceLoader装载YarnClusterManager,然后由它创建TaskScheduler和SchedulerBackend,如下:

// 当为yarn模式的时候 case masterUrl =>   // 利用当前loader装载YarnClusterManager,masterUrl为"yarn"   val cm = getClusterManager(masterUrl) match {     case Some(clusterMgr) => clusterMgr     case None => throw new SparkException("Could not parse Master URL: '" + master + "'")   }   try {     // 创建TaskScheduler,这里masterUrl并没有用到     val scheduler = cm.createTaskScheduler(sc, masterUrl)     // 创建SchedulerBackend,对于client模式,这一步会向yarn申请AppMaster,提交任务     val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)     cm.initialize(scheduler, backend)     (backend, scheduler)   } catch {     case se: SparkException => throw se     case NonFatal(e) =>       throw new SparkException("External scheduler cannot be instantiated", e)   } 

YarnClusterManager#createSchedulerBackend

sc.deployMode match {   case "cluster" =>     new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)   case "client" =>     new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)   case  _ =>     throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } 

可以看到yarn下的SchedulerBackend实现对于client和cluster模式是不同的,yarn-client模式为 YarnClientSchedulerBackend,yarn-cluster模式为 YarnClusterSchedulerBackend,之所以不同,是因为在client模式下,YarnClientSchedulerBackend 相当于 yarn application 的client,它会调用o.a.s.deploy.yarn.Client#submitApplication 来准备环境,申请资源并提交yarn任务,如下:

val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) }  val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ("--arg", hostport)  val args = new ClientArguments(argsArrayBuf.toArray) totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) // 创建o.a.s.deploy.yarn.Client对象 client = new Client(args, conf) // 调用submitApplication准备环境,申请资源,提交任务,并把appID保存下来 // 对于submitApplication,前文有详细的分析,这里与前面是一致的 bindToYarn(client.submitApplication(), None) 

而在 YarnClusterSchedulerBackend 里,由于 AppMaster 已经运行起来了,所以它并不需要再做申请资源等等工作,只需要保存appID和attemptID并启动SchedulerBackend即可

欢迎阅读转载,转载请注明出处:https://my.oschina.net/kavn/blog/1540548

本文发表于2017年09月21日 00:37
(c)注:本文转载自https://my.oschina.net/kavn/blog/1540548,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2187 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1