博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark内核揭秘-07-DAGScheduler源码解读初体验
阅读量:7216 次
发布时间:2019-06-29

本文共 2733 字,大约阅读时间需要 9 分钟。

hot3.png

当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:

进入其构造函数中:

可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

LiveListenerBus:

MapOutputTrackerMaster:

BlockManagerMaster:

通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

private def initializeEventProcessActor() {  // blocking the thread until supervisor is started, which ensures eventProcessActor is  // not null before any job is submitted  // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null  implicit val timeout = Timeout(30 seconds)  val initEventActorReply =    dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))  eventProcessActor = Await.result(initEventActorReply, timeout.duration).    asInstanceOf[ActorRef]}initializeEventProcessActor()

DAGSchedulerEventProcessActor:

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)  extends Actor with Logging {  override def preStart() {    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always    // valid when the messages arrive    // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候    // eventProcessActor已经准备就绪    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)  }  /**   * The main event loop of the DAG scheduler.   */  def receive = {    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,        listener, properties)    case StageCancelled(stageId) =>      dagScheduler.handleStageCancellation(stageId)    case JobCancelled(jobId) =>      dagScheduler.handleJobCancellation(jobId)    case JobGroupCancelled(groupId) =>      dagScheduler.handleJobGroupCancelled(groupId)    case AllJobsCancelled =>      dagScheduler.doCancelAllJobs()    case ExecutorAdded(execId, host) =>      dagScheduler.handleExecutorAdded(execId, host)    case ExecutorLost(execId) =>      dagScheduler.handleExecutorLost(execId, fetchFailed = false)    case BeginEvent(task, taskInfo) =>      dagScheduler.handleBeginEvent(task, taskInfo)    case GettingResultEvent(taskInfo) =>      dagScheduler.handleGetTaskResult(taskInfo)    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>      dagScheduler.handleTaskCompletion(completion)    case TaskSetFailed(taskSet, reason) =>      dagScheduler.handleTaskSetFailed(taskSet, reason)    case ResubmitFailedStages =>      dagScheduler.resubmitFailedStages()  }  override def postStop() {    // Cancel any active jobs in postStop hook    dagScheduler.cleanUpAfterSchedulerStop()  }}

可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。

转载于:https://my.oschina.net/ghostmanyue/blog/369462

你可能感兴趣的文章
linux添加环境变量
查看>>
【uva 1312】Cricket Field(算法效率--技巧枚举)
查看>>
VS2017 MVC项目,新建控制器提示未能加载文件或程序集“Dapper.Contrib解决方法
查看>>
【ora-err】ORA-03113: end-of-file on communication channel
查看>>
00.索引-综述
查看>>
strcpy
查看>>
AC3 Rematrix
查看>>
C#之Windows Form Application与attribute
查看>>
函数与指针分析实例
查看>>
爬虫:pycurl模块的使用说明
查看>>
Halcon算子翻译——try
查看>>
Win732位安装PostgreSQL9
查看>>
Ext JS4学习笔记1——环境的搭建
查看>>
.net MVC3实现不同的角色用不同的登录页面
查看>>
Scala学习笔记-12
查看>>
eq与gt的妙用
查看>>
哈哈哈
查看>>
projectEuler pro10
查看>>
聚焦“云开发圆桌论坛”,大前端Serverless大佬们释放了这些讯号!
查看>>
数学模板
查看>>