一、Spark的应用程序执行过程:
在Spark中,每一个“作业”称为一个应用程序(Application),每一个Application都必须有一个SparkContext,相当于application的入口,或者理解为环境。当用户(Client)提交应用程序(Application)时,该application的SparkContext就会向Master发送应用注册消息,master会根据该应用所需要的资源来分配Executor进程,Executor进程分布在各个Worker机器上面,并由WorkerEndpoint利用Driver的command环境来创建;Executor创建完成后,向Driver注册为该application的执行器,注册成功后,就向SparkContext发送注册成功消息;当SparkContext的RDD执行action算子时,就会触发执行操作,Driver根据RDD生成DAG图(有向无环图),通过DAGSecheduler进行划分stage,并将stage转化为TaskSet(一个TaskSet由多个Task组成,即Task的集合);接着由TaskSecheduler向已经注册的Executor发送执行消息,Executor接收到任务消息后,启动并运行;最后当所有任务运行结束后,由Driver处理各个executor的计算结果,并回收资源。下面两图是Application的内部划分结构图:
Spark的Application和SparkContext关联(正如上面所说的,Application通过SparkContext与各节点进行关联),每个Application中有一个或者多个job,可以并行或者串行运行job;job里面包含多个stage,stage以shuffle进行划分,stage包含多个task(一个partition就是一个task),多个task构成TaskSet,task则是任务的最小工作单元。
二、Spark运行消息通信交互过程
执行应用程序需要启动SparkContext,在SparkContext启动中,会先在DriverEndpoint中实例化SchedulerBackend对象(Standalone模式下,实例化的是SparkDeploySchedulerBackend对象),该对象继承DriverEndpoint和ClientEndpoint两个终端点。
(1)ClientEndpoint向MasterEndpoint注册Application
在Spark消息通信原理(二)提过,所有终端点都有tryRegisterAllMasters方法,用于向master注册某些消息。ClientEndpoint的tryRegisterAllMasters方法,则是用于向master注册Application的消息。ptivate def tryRegisterAllMasters(): Array[JFuture[_]] = { for(masterAddress <- masterRpcAddresses) yield { //想线程池中启动注册线程,只要当线程读到的注册成功标识为true时,退出注册线程 registerMasterThreadPool.submit(new Runnable{ override def run():Unit = try{ //判断注册成功标识 if(registered){ return } //获取Master终端点的引用,用来发送注册应用信息 val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) //向master发送注册应用信息 masterRef.send(RegisterApplication(appDescription, self)) }catch{...} }) }}
(2)MasterEndpoint处理申请注册Application的消息
当Master接收到注册应用消息时,master在registerApplication方法中做了两件事:1、记录application信息,并加入到应用列表中(FIFO执行);2、注册完毕后,master发送RegisteredApplication消息给ClientEndpoint,同时调用startExecutorsOnWorkers方法,发送LaunchExecutor消息,通知Worker启动Executor;ClientEndpoint收到RegisteredApplication消息时会更新相关状态:
case RegisteredApplication(appId_, masterRef) => appId.set(appId_) registered.set(true) master = Some(masterRef) listener.connected(appId.get)
startExecutorsOnWorkers方法中,首先获取符合执行应用的worker节点,然后遍历通知这些worker启动相应的executor(可能是一个或多个):
private def startExecutorsOnWorkers() :Unit = { //使用FIFO调度算法,先注册,先执行 for(app <- waitingApps if app.coresLeft > 0){ val coresPerExecutor:Option[Int] = app.desc.coresPerExecutor //找出存活的、剩余内存大于等于启动Executor所需大小的、核数大于等于1的worker val usableWorkers = workers.toArray.filter(_.state==WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse //确定应用运行在哪些worker上,以及每个worker分配用于运行的核数 //分配算法有两种:1、将应用运行在尽可能多的worker上;2、将应用运行在尽可能少的worker上 val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) //通知分配的worker启动worker for(pos <- 0 until usableWorkers.length if assignedCores(pos) > 0){ allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } }}
(3)Worker创建CoarseGrainedExecutorBackend对象,用于启动Executor进程
当worker收到master发送的LaunchExecutor消息时,先实例化ExecutorRunner对象,实例化过程中会创建进程生成器(ProcessBuilder),然后由该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象就是Executor运行的容器,最后worker发送ExecutorStateChanged消息给Master。case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl){ logWarning("InvalidMaster (" + masterUrl + ") attempted to launch executor.") }else{ try{ //创建Executor执行目录 val executorDir = new File(workDir, appId + "/" + execId) if(!executorDir.mkdirs()){ throw new IOException("Failed to creata directory " + executorDir) } //通过SPARK_EXECUTOR_DIRS环境变量,在worker中创建Executor执行目录,当程序执行完毕后,由worker进行删除 val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map{ dir => val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) appDir.getAbsolutePath() }.toSeq) appDirectories(appId) = appLocalDirs //在ExecutorRunner中创建CoarseGrainedExecutorBackend对象,使用的是应用信息中的command,command则是在SchedulerBackend中创建的 val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command,conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ //向master发送ExecutorStateChanged消息,表示Executor状态已经更改为Executor.RUNNING sendToMaster(ExecutorStateChanged(appId,execId,manager.state,None,None)) }catch{...} }
(4)DriverEndpoint处理RegisterExecutor消息
在第(3)点有提到,CoarseGrainedExecutorBackend对象是Executor的容器,该对象是在ExecutorRunner实例化时被创建,以及启动,启动时,会向DriverEndpoint发送RegisterExecutor消息。Driver接收到注册消息后,先判断需要注册的Executor是否已经被注册在列表当中,如果存在,则返回RegisterExecutorFailed消息返回CoarseGrainedExecutorBackend;如果不存在,则Driver会记录该Executor信息,并发送RegisteredExecutor消息。最后Driver分配任务所需资源,并发送LaunchTask消息。case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) => if(executorDataMap.contains(executorId)){ //判断列表是否已经存在该executor executorRef.send(RegisterExecutorFailed("Duplicate executor ID:" + executorId)) context.reply(true) }else{ ... //1、记录该Executor的编号,以及需要的核数 addressToExecutorId(executorRef.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls) //2、创建Executor编号和其具体信息的键值列表 CoarseGrainedSchedulerBackend.this.synchronized{ executorDataMap.put(executorId, data) if(currentExecutorIdCounter < executorId.toInt){ currentExecutorIdCounter = executorId.toInt //记录、更新当前executor数量 } if(numPendingExecutors > 0){ numPendingExecutors -= 1 } } //3、向CoarseGrainedSchedulerBackend发送注册成功信息; executorRef.send(RegisteredExecutor(executorAddress.host)) //4、并监听在总线中加入添加Executor事件 listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) //5、分配资源,并向Executor发送LaunchTask任务消息 makeOffers() }
(5)CoarseGrainedExecutorBackend实例化Executor对象
当CoarseGrainedExecutorBackend收到来自Driver发过来的RegisteredExecutor消息时,就会实例化Executor对象。启动Executor完毕,就会定时向Driver发送心跳。由(3)(4)、(5)步骤来看,executor并不是由WorkerEndpoint直接创建,而是Worker先创建CoarseGrainedExecutorBackend对象,然后CoarseGrainedExecutorBackend对象向Driver注册Executor,注册成功后,才让CoarseGrainedExecutorBackend实例化Executor对象,最后Executor交给Driver管理。CoarseGrainedExecutorBackend处理RegisteredExecutor消息的源码:case RegisteredExecutor => logInfo("Successfully registered with driver") //根据环境实例化(启动)Executor executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
executor发送心跳,等待Driver下发任务:
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")private def startDriverHeartbeater() : Unit = { val intevalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") //等待随机的时间间隔,这样,心跳在同步中不会结束 val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] val heartbeatTask = new Runnable(){ override def run():Unit = Utils.logUncaughtExceptions(reportHeartBeat()) } //发送心跳 heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)}
(6)DriverEndpoint向Executor发送LaunchTask消息
executor接收到LaunchTask消息之后,就会执行任务。执行任务时,会创建TaskRunner进程,放到thredPool中,统一由Executor进行调度。任务执行完成后,分别给CoarseGrainedExecutorBackend和Driver发送状态变更,然后继续等待任务分配(Driver继续分配任务前,会先对执行结果进行处理)。case LaunchTask(data) => if(executor == null){ //当executor没有实例化(启动),输出异常日志,并关闭Executor logError("Received LaunchTask command but executor was null") System.exit(1) }else{ val taskDesc = ser.desrialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) //启动TaskRunner进程 executor.launchTask(this, taskId=taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } //启动TaskRunner进程的方法def launchTask(context:ExecutorBackend, taskId:Long, attemptNumber:Int, taskName:String, serializedTask:ByteBuffer) : Unit = { //创建当前task的TaskRunner val tr = new TaskRunner(context, taskId=taskId, attemptNumber=attemptNumber, taskName, serializedTask) //将当前task的TaskRunner放进threadPool里面,统一由Executor调度 runningTasks.put(taskId, tr) threadPool.execute(tr)}
(7)Driver进行StatusUpdate
当DriverEndpoint接收到Executor发送过来的StatusUpdate消息后,调用TaskSchedulerImpl的statusUpdate方法,根据不同executor执行后的结果进行处理,处理完毕后,继续给Executor发送LaunchTask消息。case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) //scheduler是TaskSchedulerImpl的一个引用 if(TaskState.isFinished(state)){ executorDataMap.get(executorId) match{ case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK //继续向刚才的Executor发送LaunchTask消息,跟(4)中,Driver处理RegisterExecutor消息时调用的是同一个方法 makeOffers(executorId) case None => } }
至此,不断重复(6)、(7)操作,直至所有任务执行完毕。