博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark消息通信原理(三)——Spark运行时消息通信
阅读量:4579 次
发布时间:2019-06-08

本文共 10295 字,大约阅读时间需要 34 分钟。

一、Spark的应用程序执行过程:

        在Spark中,每一个“作业”称为一个应用程序(Application),每一个Application都必须有一个SparkContext,相当于application的入口,或者理解为环境。当用户(Client)提交应用程序(Application)时,该application的SparkContext就会向Master发送应用注册消息,master会根据该应用所需要的资源来分配Executor进程,Executor进程分布在各个Worker机器上面,并由WorkerEndpoint利用Driver的command环境来创建;Executor创建完成后,向Driver注册为该application的执行器,注册成功后,就向SparkContext发送注册成功消息;当SparkContext的RDD执行action算子时,就会触发执行操作,Driver根据RDD生成DAG图(有向无环图),通过DAGSecheduler进行划分stage,并将stage转化为TaskSet(一个TaskSet由多个Task组成,即Task的集合);接着由TaskSecheduler向已经注册的Executor发送执行消息,Executor接收到任务消息后,启动并运行;最后当所有任务运行结束后,由Driver处理各个executor的计算结果,并回收资源。

下面两图是Application的内部划分结构图:

        Spark的Application和SparkContext关联(正如上面所说的,Application通过SparkContext与各节点进行关联),每个Application中有一个或者多个job,可以并行或者串行运行job;job里面包含多个stage,stage以shuffle进行划分,stage包含多个task(一个partition就是一个task),多个task构成TaskSet,task则是任务的最小工作单元。

二、Spark运行消息通信交互过程

        执行应用程序需要启动SparkContext,在SparkContext启动中,会先在DriverEndpoint中实例化SchedulerBackend对象(Standalone模式下,实例化的是SparkDeploySchedulerBackend对象),该对象继承DriverEndpoint和ClientEndpoint两个终端点。

(1)ClientEndpoint向MasterEndpoint注册Application

        在Spark消息通信原理(二)提过,所有终端点都有tryRegisterAllMasters方法,用于向master注册某些消息。ClientEndpoint的tryRegisterAllMasters方法,则是用于向master注册Application的消息。

ptivate def tryRegisterAllMasters(): Array[JFuture[_]] = {    for(masterAddress <- masterRpcAddresses) yield {        //想线程池中启动注册线程,只要当线程读到的注册成功标识为true时,退出注册线程        registerMasterThreadPool.submit(new Runnable{            override def run():Unit = try{                //判断注册成功标识                if(registered){                    return                }                //获取Master终端点的引用,用来发送注册应用信息                val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)                //向master发送注册应用信息                masterRef.send(RegisterApplication(appDescription, self))            }catch{...}        })    }}

(2)MasterEndpoint处理申请注册Application的消息

        当Master接收到注册应用消息时,master在registerApplication方法中做了两件事:1、记录application信息,并加入到应用列表中(FIFO执行);2、注册完毕后,master发送RegisteredApplication消息给ClientEndpoint,同时调用startExecutorsOnWorkers方法,发送LaunchExecutor消息,通知Worker启动Executor;

ClientEndpoint收到RegisteredApplication消息时会更新相关状态:

case RegisteredApplication(appId_, masterRef) =>    appId.set(appId_)    registered.set(true)    master = Some(masterRef)    listener.connected(appId.get)

startExecutorsOnWorkers方法中,首先获取符合执行应用的worker节点,然后遍历通知这些worker启动相应的executor(可能是一个或多个):

private def startExecutorsOnWorkers() :Unit = {    //使用FIFO调度算法,先注册,先执行    for(app <- waitingApps if app.coresLeft > 0){        val coresPerExecutor:Option[Int] = app.desc.coresPerExecutor         //找出存活的、剩余内存大于等于启动Executor所需大小的、核数大于等于1的worker        val usableWorkers = workers.toArray.filter(_.state==WorkerState.ALIVE)            .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1))            .sortBy(_.coresFree).reverse         //确定应用运行在哪些worker上,以及每个worker分配用于运行的核数        //分配算法有两种:1、将应用运行在尽可能多的worker上;2、将应用运行在尽可能少的worker上        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)         //通知分配的worker启动worker        for(pos <- 0 until usableWorkers.length if assignedCores(pos) > 0){            allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))        }    }}

(3)Worker创建CoarseGrainedExecutorBackend对象,用于启动Executor进程

        当worker收到master发送的LaunchExecutor消息时,先实例化ExecutorRunner对象,实例化过程中会创建进程生成器(ProcessBuilder),然后由该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象就是Executor运行的容器,最后worker发送ExecutorStateChanged消息给Master。

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>    if (masterUrl != activeMasterUrl){        logWarning("InvalidMaster (" + masterUrl + ") attempted to launch executor.")    }else{        try{            //创建Executor执行目录            val executorDir =  new File(workDir, appId + "/" + execId)            if(!executorDir.mkdirs()){                throw new IOException("Failed to creata directory " + executorDir)            }             //通过SPARK_EXECUTOR_DIRS环境变量,在worker中创建Executor执行目录,当程序执行完毕后,由worker进行删除            val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map{ dir =>                    val appDir = Utils.createDirectory(dir, namePrefix = "executor")                    Utils.chmod700(appDir)                    appDir.getAbsolutePath()                }.toSeq)            appDirectories(appId) = appLocalDirs             //在ExecutorRunner中创建CoarseGrainedExecutorBackend对象,使用的是应用信息中的command,command则是在SchedulerBackend中创建的            val manager = new ExecutorRunner(                            appId,                            execId,                            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command,conf)),                            cores_,                            memory_,                            self,                            workerId,                            host,                            webUi.boundPort,                            publicAddress,                            sparkHome,                            executorDir,                            workerUri,                            conf,                            appLocalDirs,                            ExecutorState.RUNNING)            executors(appId + "/" + execId) = manager            manager.start()            coresUsed += cores_            memoryUsed += memory_             //向master发送ExecutorStateChanged消息,表示Executor状态已经更改为Executor.RUNNING            sendToMaster(ExecutorStateChanged(appId,execId,manager.state,None,None))        }catch{...}    }

(4)DriverEndpoint处理RegisterExecutor消息

        在第(3)点有提到,CoarseGrainedExecutorBackend对象是Executor的容器,该对象是在ExecutorRunner实例化时被创建,以及启动,启动时,会向DriverEndpoint发送RegisterExecutor消息。Driver接收到注册消息后,先判断需要注册的Executor是否已经被注册在列表当中,如果存在,则返回RegisterExecutorFailed消息返回CoarseGrainedExecutorBackend;如果不存在,则Driver会记录该Executor信息,并发送RegisteredExecutor消息。最后Driver分配任务所需资源,并发送LaunchTask消息。

case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>    if(executorDataMap.contains(executorId)){        //判断列表是否已经存在该executor        executorRef.send(RegisterExecutorFailed("Duplicate executor ID:" + executorId))        context.reply(true)    }else{        ...        //1、记录该Executor的编号,以及需要的核数        addressToExecutorId(executorRef.address) = executorId        totalCoreCount.addAndGet(cores)        totalRegisteredExecutors.addAndGet(1)        val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)        //2、创建Executor编号和其具体信息的键值列表        CoarseGrainedSchedulerBackend.this.synchronized{            executorDataMap.put(executorId, data)            if(currentExecutorIdCounter < executorId.toInt){                currentExecutorIdCounter = executorId.toInt    //记录、更新当前executor数量            }            if(numPendingExecutors > 0){                numPendingExecutors -= 1            }        }        //3、向CoarseGrainedSchedulerBackend发送注册成功信息;        executorRef.send(RegisteredExecutor(executorAddress.host))        //4、并监听在总线中加入添加Executor事件        listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))         //5、分配资源,并向Executor发送LaunchTask任务消息        makeOffers()    }

(5)CoarseGrainedExecutorBackend实例化Executor对象

        当CoarseGrainedExecutorBackend收到来自Driver发过来的RegisteredExecutor消息时,就会实例化Executor对象。启动Executor完毕,就会定时向Driver发送心跳。由(3)(4)、(5)步骤来看,executor并不是由WorkerEndpoint直接创建,而是Worker先创建CoarseGrainedExecutorBackend对象,然后CoarseGrainedExecutorBackend对象向Driver注册Executor,注册成功后,才让CoarseGrainedExecutorBackend实例化Executor对象,最后Executor交给Driver管理。CoarseGrainedExecutorBackend处理RegisteredExecutor消息的源码:

case RegisteredExecutor =>    logInfo("Successfully registered with driver")    //根据环境实例化(启动)Executor    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

executor发送心跳,等待Driver下发任务:

private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")private def startDriverHeartbeater() : Unit = {    val intevalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")     //等待随机的时间间隔,这样,心跳在同步中不会结束    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]    val heartbeatTask = new Runnable(){        override def run():Unit = Utils.logUncaughtExceptions(reportHeartBeat())    }     //发送心跳    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)}

(6)DriverEndpoint向Executor发送LaunchTask消息

        executor接收到LaunchTask消息之后,就会执行任务。执行任务时,会创建TaskRunner进程,放到thredPool中,统一由Executor进行调度。任务执行完成后,分别给CoarseGrainedExecutorBackend和Driver发送状态变更,然后继续等待任务分配(Driver继续分配任务前,会先对执行结果进行处理)。

case LaunchTask(data) =>    if(executor == null){        //当executor没有实例化(启动),输出异常日志,并关闭Executor        logError("Received LaunchTask command but executor was null")        System.exit(1)    }else{        val taskDesc = ser.desrialize[TaskDescription](data.value)        logInfo("Got assigned task " + taskDesc.taskId)         //启动TaskRunner进程        executor.launchTask(this, taskId=taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)    }  //启动TaskRunner进程的方法def launchTask(context:ExecutorBackend, taskId:Long, attemptNumber:Int, taskName:String, serializedTask:ByteBuffer) : Unit = {    //创建当前task的TaskRunner    val tr = new TaskRunner(context, taskId=taskId, attemptNumber=attemptNumber, taskName, serializedTask)    //将当前task的TaskRunner放进threadPool里面,统一由Executor调度    runningTasks.put(taskId, tr)    threadPool.execute(tr)}

(7)Driver进行StatusUpdate

        当DriverEndpoint接收到Executor发送过来的StatusUpdate消息后,调用TaskSchedulerImpl的statusUpdate方法,根据不同executor执行后的结果进行处理,处理完毕后,继续给Executor发送LaunchTask消息。

case StatusUpdate(executorId, taskId, state, data) =>    scheduler.statusUpdate(taskId, state, data.value)    //scheduler是TaskSchedulerImpl的一个引用    if(TaskState.isFinished(state)){        executorDataMap.get(executorId) match{            case Some(executorInfo) =>                executorInfo.freeCores += scheduler.CPUS_PER_TASK                //继续向刚才的Executor发送LaunchTask消息,跟(4)中,Driver处理RegisterExecutor消息时调用的是同一个方法                makeOffers(executorId)                case None =>        }    }

        至此,不断重复(6)、(7)操作,直至所有任务执行完毕。

转载于:https://www.cnblogs.com/SysoCjs/p/11345395.html

你可能感兴趣的文章
Ural 1517. Freedom of Choice 后缀数组
查看>>
【转载】Maven入门实践
查看>>
1-4-03:奇偶数判断
查看>>
【SQL Server备份恢复】提高SQL Server备份速度
查看>>
命令行简介(附加参考资料)
查看>>
从0开始整合SSM框架-1.mybatis
查看>>
移位操作的疑问
查看>>
UILabel常用属性小结
查看>>
gitlab 邮件服务器配置
查看>>
Python 循环语句(while, for)
查看>>
深入理解JavaScript原型链
查看>>
LinearGradient类来实现图片的渐变效果
查看>>
Golang关键字—— if/else
查看>>
数据清洗
查看>>
PHP&MySQL(三)——数组
查看>>
各种语法解释及用法
查看>>
GPS.NET 和 GeoFramework开源了
查看>>
汇编:采用址表的方法编写程序实现C程序的switch功能
查看>>
AtiveMQ初次连接的 http error:503 连接错误 Prolem accessing /.Reason : Service Unavailable...
查看>>
OFO和摩拜共享单车
查看>>