博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming 源码分析
阅读量:5338 次
发布时间:2019-06-15

本文共 29623 字,大约阅读时间需要 98 分钟。

SparkStreaming 分析 (基于1.5版本源码)

SparkStreaming 介绍

SparkStreaming是一个流式批处理框架,它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景。其处理流程如下:

1、    接收实时流数据并持久化

2、    将实时流以时间片切分成多个批次

3、    将每块(一个批次)的数据做为RDD,并用RDD操作处理数据

4、    每块数据生成一个SparkJob,提交Spark进行处理,并返回结果

                       

Dstream 介绍

Spark Streaming中一个关键的程序抽象,表示从数据源获取持续性的数据流以及经过转换后的数据流。DStream由持续的RDD序列组成 :

        

         作用于DStream上的操作有两种(与RDD类似):Transformation与Output。

         DStream之间的的转换所形成的依赖关系保存在DStreamGraph中(DstreamGraph在StreamingContext创建时初始化),DStreamGraph会定期生成RDD DAG.

SparkStreaming应用

SparkStreaming应用程序,以WordCount为例,实现如下:

val conf = new SparkConf().setAppName("wordCount").setMaster("local[4]")    val sc = new SparkContext(conf)    val ssc = new StreamingContext(sc, Seconds(10))    val lines = ssc.socketTextStream("localhost", 8585, StorageLevel.MEMORY_ONLY)    val words = lines.flatMap(_.split(" ")).map(w => (w,1))    val wordCount = words.reduceByKey(_+_)    wordCount.print    ssc.start()

 

Spark Streaming 执行过程分析

         Spark Streaming执行过程,将依托第一部分应用程序(WordCount)进行分析

StreamingContext初始化过程

         StreamingContext是很多Streaming功能的入口,如:它提供从多种数据源创建DStream的方法等。由上述WordCount应用可知,Streaming应用执行时首先会创建StreamingContext。

伴随StreamingContext的创建将会创建如下主要组件:

1、创建DStreamGraph,并为其设置转换成RDD Graph的时间间隔。

private[streaming] val graph: DStreamGraph = {    if (isCheckpointPresent) {      cp_.graph.setContext(this)      cp_.graph.restoreCheckpointData()      cp_.graph    } else {      require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")      val newGraph = new DStreamGraph()      newGraph.setBatchDuration(batchDur_)      newGraph    }  }
View Code

2、创建JobScheduler

private[streaming] val scheduler = new JobScheduler(this)

 

DStream创建及转换

        利用刚刚创建的StreamingContext通过调用socketTextStream方法创建SocketInputDStream(ReceiverInputDstream).

         InputDStream继承体系如下:(以SocketInputDStream与KafkaInputDStream为例)。

 

JAVA中初始化子类时,会先初始化其父类。所以在创建SocketInputDStream时,会先初始化InputDStream,在InputDStream中实现将自身加入DStreamGraph中(具体见上图)。

         InputDStream子类都有一个getReceiver方法, 此方法用来获取Receiver对象. 以SocketInputDStream为例, 如上图, 其会创建SocketReceiver来接收数据.

         DStream中算子的转换,类似于RDD中的转换,都是延迟计算。当上述应用遇到print--Output算子时,会将DStream转换为ForEachDStream,并调register方法作为OutputStream注册到DStreamGraph的outputStreams列表. 

/**   * Print the first num elements of each RDD generated in this DStream. This is an output   * operator, so this DStream will be registered as an output stream and there materialized.   */  def print(num: Int): Unit = ssc.withScope {    def foreachFunc: (RDD[T], Time) => Unit = {      (rdd: RDD[T], time: Time) => {        val firstNum = rdd.take(num + 1)        // scalastyle:off println        println("-------------------------------------------")        println("Time: " + time)        println("-------------------------------------------")        firstNum.take(num).foreach(println)        if (firstNum.length > num) println("...")        println()        // scalastyle:on println      }    }    new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()  }
View Code

其中ForEachDStream不同于其它DStream的地方是其重载了generateJob方法。

所有DStream之间的转换关系,使用类似RDD的依赖来表示。

启动过程

     应用程序通过调用ssc.start()方法,开始执行stream应用的执行.start方法具体实具如下所示:

def start(): Unit = synchronized {    state match {      case INITIALIZED =>        startSite.set(DStream.getCreationSite())        StreamingContext.ACTIVATION_LOCK.synchronized {          StreamingContext.assertNoOtherContextIsActive()          try {            validate()            // Start the streaming scheduler in a new thread, so that thread local properties            // like call sites and job groups can be reset without affecting those of the            // current thread.            ThreadUtils.runInNewThread("streaming-start") {              sparkContext.setCallSite(startSite.get)              sparkContext.clearJobGroup()              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")              scheduler.start()            }            state = StreamingContextState.ACTIVE          } catch {            case NonFatal(e) =>              logError("Error starting the context, marking it as stopped", e)              scheduler.stop(false)              state = StreamingContextState.STOPPED              throw e          }          StreamingContext.setActiveContext(this)        }        shutdownHookRef = ShutdownHookManager.addShutdownHook(          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)        // Registering Streaming Metrics at the start of the StreamingContext        assert(env.metricsSystem != null)        env.metricsSystem.registerSource(streamingSource)        uiTab.foreach(_.attach())        logInfo("StreamingContext started")      case ACTIVE =>        logWarning("StreamingContext has already been started")      case STOPPED =>        throw new IllegalStateException("StreamingContext has already been stopped")    }  }
View Code

 

其中, 最核心代码为scheduler.start, scheduler为JobScheduler对象. 如上文所述其在StreamingContext实例化时创建. (下面将按照函数调用流程进行分析)

JobScheduler创建及执行

         JobScheuler用来调度运行在Spark上的作业, 它使用JobGenerator生成jobs, 然后使用一个线程池并行运行提交作业.

一. JobScheduler 创建:

JobScheduler由StreamingContext创建,并触发start调用.

JobScheduler初始化时,会创建一个ThreadPool(jobExecutor)和jobGenerator

其中:

jobExecutor用于提交作业.ThreadPool中线程的数量为Job并发量,由”spark.streaming.concurrentJobs”指定,默认为1.

JobGenerator为JobGenerator类实例.其用于依据DStreams生成jobs。

 

二. JobScheduler执行

start方法执行时会创建并启动以下服务:

eventLoop: EventLoop[JobSchedulerEvent]对象,用以接收和处理事件。调用者通过调用其post方法向事件队列注册事件。EventLoop开始执行时,会开启一deamon线程用于处理队列中的事件。EventLoop是一个抽象类,JobScheduler中初始化EventLoop时实现了其OnReceive方法。该方法中指定接收的事件由processEvent(event)方法处理。

receiverTracker: ReceiverTracker对象,用以管理ReceiverInputDStream中receiver的执行。这个对象必须在所有InputStream添加至DStreamGraph中后创建。因其实例化时会从DStreamGraph中抽取InputDStream. 以便用在其启动时抽取其中的Receiver。

jobGenertor:其在JobScheduler实例化时创建,在此处进行启动。

此部分代码实现,如下图所示:

def start(): Unit = synchronized {    if (eventLoop != null) return // scheduler has already been started    logDebug("Starting JobScheduler")    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)    }    eventLoop.start()    // attach rate controllers of input streams to receive batch completion updates    for {      inputDStream <- ssc.graph.getInputStreams      rateController <- inputDStream.rateController    } ssc.addStreamingListener(rateController)    listenerBus.start(ssc.sparkContext)    receiverTracker = new ReceiverTracker(ssc)    inputInfoTracker = new InputInfoTracker(ssc)    receiverTracker.start()    jobGenerator.start()    logInfo("Started JobScheduler")  }
View Code

 

JobScheduler启动时,主要实例化并启动上述组件,下面将整个过程分为两大块并按组件进行介绍。

  

1、数据接收过程(启动Receiver, 接收数据, 生成Block)

ReceiverTracker

         在JobScheduler中创建并调用其start方法。

一、JobScheduler创建

JobScheduler创建时会创建以下主要属性:

receiverInputStream:接收数据的InputDStream实例,通过ssc.graph.getReceiverInputStreams获取,其内部存放ReceiverInputDStream实例及子类实例(包括:SocketInputDStream, RawInputDStream,FlumePollingInputDStream, KafkaInputDStream, MQTTInputDStream)

receiverdBlockTracker: ReceivedBlockTracker实例,用来记录Receiver接收的blocks。通过此类进行的操作可以保存一个WAL(write ahead log), 以便失败后进行保存

schedulingPolocy: ReceiverSchedulingPolicy实例,用于调度Receiver.

receiverTrackingInfo: HashMap对象, 用于维护receivers信息,key: receiverId, value: receiver info. 只能由ReceiverTrackerEndpoint访问。

 

二、start方法执行

         start方法被调用时,首先其会判断receiverInputStream是否为空,如果为空,也就是没有任何Receiver,不做任何操作。当不为空时,其会创建ReceiverTrackerEndpoint实例endpoint.

         ReceiverTrackerEndpoint 为RPC终端,用于接收各Receiver发送的消息并进行相应处理,必要时给予响应。

         如果第一次启动,则默认不跳过启动Receiver(默认:skipReceiverLaunch = false,创建ReceiverTracker时默认指定),则执行launchReceivers()方法。

这部分代码如下图所示:

/** Start the endpoint and receiver execution thread. */  def start(): Unit = synchronized {    if (isTrackerStarted) {      throw new SparkException("ReceiverTracker already started")    }    if (!receiverInputStreams.isEmpty) {      endpoint = ssc.env.rpcEnv.setupEndpoint(        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))      if (!skipReceiverLaunch) launchReceivers()      logInfo("ReceiverTracker started")      trackerState = Started    }  }
View Code

其中launchReceivers方法,将调用runDummySparkJob()方法和向ReceiverTrackerEndpoint发送StartAllReceivers消息。

/**   * Get the receivers from the ReceiverInputDStreams, distributes them to the   * worker nodes as a parallel collection, and runs them.   */  private def launchReceivers(): Unit = {    val receivers = receiverInputStreams.map(nis => {      val rcvr = nis.getReceiver()      rcvr.setReceiverId(nis.id)      rcvr    })    runDummySparkJob()    logInfo("Starting " + receivers.length + " receivers")    endpoint.send(StartAllReceivers(receivers))  }
View Code

runDummySparkJob()方法,用来确认slave节点是否注册,避免将所有receivers分发到同一节点。其通过执行一非常简单的任务,让SparkCore执行一次,然后通过其组件信息判断是否有除driver之外的Executor存在。runDummySparkJob代码如下:

/**   * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the   * receivers to be scheduled on the same node.   *   * TODO Should poll the executor number and wait for executors according to   * "spark.scheduler.minRegisteredResourcesRatio" and   * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.   */  private def runDummySparkJob(): Unit = {    if (!ssc.sparkContext.isLocal) {      ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()    }    assert(getExecutors.nonEmpty)  }
View Code

检查完slave注册情况后,其会从receiverInputstreams列表中抽取所有Receivers,并用其创建StartAllReceivers消息发送给endpoint.

ReceiverTrackerEndpoint

         用于接收Receiver及其自身发送的消息并进行处理,必要时进行响应。其可处理的消息有:

StartAllReceivers,、RestartReceivers、CleanupOldBlocks、UpdateReceiverRateLimit、ReportError以及RegisterReceiver、AddBlock、DeregisterReceiver、AllReceiverIds、StopAllReceivers.

当ReceiverTrackerEndpoint接收到StartAllReceivers时,其会通过调度策略shedulingPolicy(前文已描述) 生成Receiver分发策略(挑选出可运行Receiver的Executor),然后通过调用startReceiver进行启动。

case StartAllReceivers(receivers) =>        val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)        for (receiver <- receivers) {          val executors = scheduledExecutors(receiver.streamId)          updateReceiverScheduledExecutors(receiver.streamId, executors)          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation          startReceiver(receiver, executors)        }
View Code

     此部分代码实现,如上所示。其中schedulingPolicy与startReceiver将在下小节进行说明。

ReceiverSchedulingPolicy 调度策略

         此类用来,调度receivers, 并保证其均匀分布。Receiver调度分为两个阶段:一是全局调度,当ReceiverTracker启动时, 通过scheduleReceivers同时调度所有reciver;二是局部调度,当某个Receiver重启时发生,通过scheduleReceiver进行调度。此处接上小节,接着进行scheduleReceivers的解析:

在ReceiverTrackerEndpoint通过调用schedulingPolicy.scheduleReceivers(receivers, getExecutor)来触发Receivers调度, 其中包含两个参数: receivers和getExecutor. 第一个参数receivers以消息的形式进行传递;第二个参数代表的是可进行调度的Executor列表,用getExecutor方法进行获取。获取的Exectutor不包含与Driver在同一个主机上的节点。其中Executor信息格式为“host:port”

scheduleReceivers在执行时,首先将从executors列表转换成Map格式,单个元素host->”host:port”格式。 然后遍历receivers列表,为其逐个分配节点,分配过程如下:

从列表取出一个receiver,判断其是否具有preferredLocation.(可优先选择机器) 此方法在Receiver基类中声明,要求子类进行实现。上文举列中SocketReceiver未重写此方法,因此不具preferredLocation,因此未执行Receiver的Executor节点可以随意选取。

此部分代码如下

/**   * Try our best to schedule receivers with evenly distributed. However, if the   * `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly   * because we have to respect them.   *   * Here is the approach to schedule executors:   * 
    *
  1. First, schedule all the receivers with preferred locations (hosts), evenly among the * executors running on those host.<> *
  2. Then, schedule all other receivers evenly among all the executors such that overall * distribution over all the receivers is even.<> *
* * This method is called when we start to launch receivers at the first time. */ def scheduleReceivers( receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = { if (receivers.isEmpty) { return Map.empty } if (executors.isEmpty) { return receivers.map(_.streamId -> Seq.empty).toMap } val hostToExecutors = executors.groupBy(_.split(":")(0)) val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String]) val numReceiversOnExecutor = mutable.HashMap[String, Int]() // Set the initial value to 0 executors.foreach(e => numReceiversOnExecutor(e) = 0) // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation", // we need to make sure the "preferredLocation" is in the candidate scheduled executor list. for (i <- 0 until receivers.length) { // Note: preferredLocation is host but executors are host:port receivers(i).preferredLocation.foreach { host => hostToExecutors.get(host) match { case Some(executorsOnHost) => // preferredLocation is a known host. Select an executor that has the least receivers in // this host val leastScheduledExecutor = executorsOnHost.minBy(executor => numReceiversOnExecutor(executor)) scheduledExecutors(i) += leastScheduledExecutor numReceiversOnExecutor(leastScheduledExecutor) = numReceiversOnExecutor(leastScheduledExecutor) + 1 case None => // preferredLocation is an unknown host. // Note: There are two cases: // 1. This executor is not up. But it may be up later. // 2. This executor is dead, or it's not a host in the cluster. // Currently, simply add host to the scheduled executors. scheduledExecutors(i) += host } } } // For those receivers that don't have preferredLocation, make sure we assign at least one // executor to them. for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) { // Select the executor that has the least receivers val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2) scheduledExecutorsForOneReceiver += leastScheduledExecutor numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1 } // Assign idle executors to receivers that have less executors val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1) for (executor <- idleExecutors) { // Assign an idle executor to the receiver that has least candidate executors. val leastScheduledExecutors = scheduledExecutors.minBy(_.size) leastScheduledExecutors += executor } receivers.map(_.streamId).zip(scheduledExecutors).toMap }
View Code

  当挑选出要执行的Executor后,调用startReceiver(receiver, executor)方法, 在指定executor是启动receiver.

startReceiver方法

         将Receiver使用RDD进行包装,然后使用SparkContext.submitJob方法进行提交,使其巧妙的以普通RDD作业运行的方式将Receiver分发在选出的Executor方法上执行, 其执行逻辑为startReceiverFunc操作。当startReceiverFunc方法被调用时,会为分发到该节点中的receiver创建ReceiverSupervisorImpl对够象supervisor。 然后调用supervisor的start方法,使其监控运行在worker中的receiver。ReceiverSupervisor提供了处理接收数据的必要接口。这部分的具体实现如下代码所示:

// Function to start the receiver on the worker node      val startReceiverFunc: Iterator[Receiver[_]] => Unit =        (iterator: Iterator[Receiver[_]]) => {          if (!iterator.hasNext) {            throw new SparkException(              "Could not start receiver as object not found.")          }          if (TaskContext.get().attemptNumber() == 0) {            val receiver = iterator.next()            assert(iterator.hasNext == false)            val supervisor = new ReceiverSupervisorImpl(              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)            supervisor.start()            supervisor.awaitTermination()          } else {            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.          }        }      // Create the RDD using the scheduledExecutors to run the receiver in a Spark job      val receiverRDD: RDD[Receiver[_]] =        if (scheduledExecutors.isEmpty) {          ssc.sc.makeRDD(Seq(receiver), 1)        } else {          ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))        }      receiverRDD.setName(s"Receiver $receiverId")      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())      // We will keep restarting the receiver job until ReceiverTracker is stopped      future.onComplete {        case Success(_) =>          if (!shouldStartReceiver) {            onReceiverJobFinish(receiverId)          } else {            logInfo(s"Restarting Receiver $receiverId")            self.send(RestartReceiver(receiver))          }        case Failure(e) =>          if (!shouldStartReceiver) {            onReceiverJobFinish(receiverId)          } else {            logError("Receiver has been stopped. Try to restart it.", e)            logInfo(s"Restarting Receiver $receiverId")            self.send(RestartReceiver(receiver))          }      }(submitJobThreadPool)      logInfo(s"Receiver ${receiver.streamId} started")    }
View Code

当receiver执行起来后ReceiverTracker等待程序结束。接下来介绍ReceiverSupervisorImpl

ReceiverSupervisor

用监控运行在worker中的receiver,提供了处理接收数据的必要接口,其start方法执行逻辑如下:

/** Start the supervisor */  def start() {    onStart()    startReceiver()  }

如上代码所述,其将执行onStart()及startReceiver()两个方法, onStart由其实现类ReceiverSupervisor类实现。

    其中OnStart方法中会让所有注册的BlockGenerators执行, BlockGenerator的作用是定时使用接收的数据生成Block,并将生成的block加入队列。下一小节具体说明BlockGenerators。

    startReceiver方法用于启动Receiver, 实现逻辑如下所述:

/** Start receiver */  def startReceiver(): Unit = synchronized {    try {      if (onReceiverStart()) {        logInfo("Starting receiver")        receiverState = Started        receiver.onStart()        logInfo("Called receiver onStart")      } else {        // The driver refused us        stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)      }    } catch {      case NonFatal(t) =>        stop("Error starting receiver " + streamId, Some(t))    }  }
View Code

其中的onReceiverStart由ReceiverSupervisorImpl实现, 其作用主要是:向ReceiverTracker发送注册消息(RegisterReceiver),并等待响应(成功:true; 否则:false.)。

方法通过调用Receiver(SocketReceiver)中的onStart 使用创建deamon线程并开启,该线程用于真实receive数据。其中onStart方法具体实现如下图所示:

def onStart() {    // Start the thread that receives data over a connection    new Thread("Socket Receiver") {      setDaemon(true)      override def run() { receive() }    }.start()  }
View Code

其中receive方法就是接收数据的具体实现部分, 其内部创建Socket对象,通过Socket接收信息。然后将其转换成String,用Iterator进行包装,然后调用store方法进行存储。其receive方法的实现如下所示:

/** Create a socket connection and receive data until receiver is stopped */  def receive() {    var socket: Socket = null    try {      logInfo("Connecting to " + host + ":" + port)      socket = new Socket(host, port)      logInfo("Connected to " + host + ":" + port)      val iterator = bytesToObjects(socket.getInputStream())      while(!isStopped && iterator.hasNext) {        store(iterator.next)      }      if (!isStopped()) {        restart("Socket data stream had no more data")      } else {        logInfo("Stopped receiving")      }    } catch {      case e: java.net.ConnectException =>        restart("Error connecting to " + host + ":" + port, e)      case NonFatal(e) =>        logWarning("Error receiving data", e)        restart("Error receiving data", e)    } finally {      if (socket != null) {        socket.close()        logInfo("Closed socket to " + host + ":" + port)      }    }  }}
View Code

其中store方法在Receiver基类中定义, 其调用ReceiverSupervisor中的pushSingle方法,将一个接收数据的记录保存传递给BlockGenerator.则BlockGenertor会使用其addData方法将记录加入bufffer.

BlockGenerator

上节所述:ReceiverSupervisor调用start时会执行OnStart。OnStart方法中会让所有注册的BlockGenerators执行(调用start方法), BlockGenerator的作用是定时使用接收的数据生成Block,并将生成的block加入队列。下面将对其进行分析:

BlockGenerators重要属性:

         blockIntervalTimer: RecurringTimer对象, 其做用创建daemon线程, 定时执行第三个参数callback传入函数. 此处为updateCurrentBuffer函数。

         blockPushingThread: 用于执行keepPushingBlocks的线程,其中blocksForPushing类型为ArrayBlockingQueue.

   当调用其start()方法时,其会启动调用blockIntervalTimer的start方法及启动blockPushingThread线程。

blockIntervalTimer的start方法被调用时,它将启动后台线程,定时执行updateCurrentBuffer操作,该操作将缓存中的数据,包装成block, 加入 blocksForPushing.

blockPushingThread线程开启后,会执行keepPushingBlocks() 方法, 其将blocksForPushing中的block信息调用pushBlock方法,这个方法内部触发BlockGeneratorListener的onPushBlock事件。该监听器在ReceiverSupervisorImpl创建BlockGenerator时做为参数传入。

BlockGeneratorListener的onPushBlock事件触发时,其将调用pushArrayBuffer方法,pushArrayBuffer进一步调用pushAndReportBlock方法。PushAndReportBlock方法具体实现如下:

/** Store block and report it to driver */  def pushAndReportBlock(      receivedBlock: ReceivedBlock,      metadataOption: Option[Any],      blockIdOption: Option[StreamBlockId]    ) {    val blockId = blockIdOption.getOrElse(nextBlockId)    val time = System.currentTimeMillis    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")    val numRecords = blockStoreResult.numRecords    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))    logDebug(s"Reported block $blockId")  }
View Code

   其将利用receiverdBlockHandler.storeBlock存储block数据, 并向trackerEndpoint发送AddBlock消息。

ReceiverTracker接收到AddBlock信息后,其将信息包装成BlockAdditionEvent写入日志,并且将Block信息加入getReceiverdBlockQueue队列

 

小结: 以上部分,描述了数据的接收、存储过程。如上文所述,在JobSheduler执行start方法时,还将会启动jobGenerator组件,也就是启动数据处理的过程。下面将对jobGenerator进行处理。

 

2、数据处理过程(DStream转换成RDD, 生成Jobs并提交)

JobGenerator

重要属性:

         Timer: RecurringTimer对象, 与JobScheduler中类似,在该对象中其会定期执行GenerateJobs方法。

 

         在JobGenerator.start()被调用时,其将创建eventLoop对象并启动,以及调用startFirstTime()方法。

其中eventLoop与上文JobScheduler中的一样,其定义事件交由processEvent(event). 此处processEvent实现逻辑为:

/** Processes all events */  private def processEvent(event: JobGeneratorEvent) {    logDebug("Got event " + event)    event match {      case GenerateJobs(time) => generateJobs(time)      case ClearMetadata(time) => clearMetadata(time)      case DoCheckpoint(time, clearCheckpointDataLater) =>        doCheckpoint(time, clearCheckpointDataLater)      case ClearCheckpointData(time) => clearCheckpointData(time)    }  }
View Code

         startFisrtTime()的实现逻辑为: 调用graph.start方法及开启Timer(即解发GeneratorJobs()事件).

DStreamGraph.start实现逻辑如下:

def start(time: Time) {    this.synchronized {      require(zeroTime == null, "DStream graph computation already started")      zeroTime = time      startTime = time      outputStreams.foreach(_.initialize(zeroTime))      outputStreams.foreach(_.remember(rememberDuration))      outputStreams.foreach(_.validateAtStart)      inputStreams.par.foreach(_.start())    }  }
View Code

其会调用outputStreams 及inputStream的start方法

         当GenerateJobs事件触发时, processEvent中会执行generateJobs(time)方法, 该方法最终会调用ReceiverBlockTracker.allocateBlocksToBatch(time)将receiverdBlockQueue中未分配的block生成一个批次,并将信息保存在timeToAllocatedBlocks(Hashmap<batchTime, allocatedBlocks>)中,同时记录日志信息。

         然后调用graph.generateJobs(time)方法,调用所有outputStream.generateJob(time). 因为OutputStream(举例中为ForEachDStream)会重写DStream中的generateJob方法,此时会调用ForEachDStream中的generateJob.[不要错误的认为是DStream], 其具体实现为

override def generateJob(time: Time): Option[Job] = {    parent.getOrCompute(time) match {      case Some(rdd) =>        val jobFunc = () => createRDDWithLocalProperties(time) {          ssc.sparkContext.setCallSite(creationSite)          foreachFunc(rdd, time)        }        Some(new Job(time, jobFunc))      case None => None    }  }
View Code

其中parent.getOrCompute用于获取指定批次的RDD。此处是调用parent.getOrCompute, 先来看一下WordCount应用中DStream的转换,如下图:

  

  getOrCompute( compute方法与之类似)方法由DStream基类创建, 如果子类重写该方法,则执行子类方法; 若未重写,则执行基类中的方法。getOrCompute方法会进行递归,直至回溯至SocketInputDStream中。但SocketInputDStream中并未重写此方法,所以其将执行基类中方法,代码如下:

/**   * Get the RDD corresponding to the given time; either retrieve it from cache   * or compute-and-cache it.   */  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {    // If RDD was already generated, then retrieve it from HashMap,    // or else compute the RDD    generatedRDDs.get(time).orElse {      // Compute the RDD if time is valid (e.g. correct time in a sliding window)      // of RDD generation, else generate nothing.      if (isTimeValid(time)) {        val rddOption = createRDDWithLocalProperties(time) {          // Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details. We need to have this call here because          // compute() might cause Spark jobs to be launched.          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            compute(time)          }        }        rddOption.foreach { case newRDD =>          // Register the generated RDD for caching and checkpointing          if (storageLevel != StorageLevel.NONE) {            newRDD.persist(storageLevel)            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")          }          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {            newRDD.checkpoint()            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")          }          generatedRDDs.put(time, newRDD)        }        rddOption      } else {        None      }    }  }
View Code

其中通过执行compute方法来生成批定批次的初始RDD(利用接收到的数据生成BlockRDD)。compute方法代码如下:

/**   * Generates RDDs with blocks received by the receiver of this stream. */  override def compute(validTime: Time): Option[RDD[T]] = {    val blockRDD = {      if (validTime < graph.startTime) {        // If this is called for any time before the start time of the context,        // then this returns an empty RDD. This may happen when recovering from a        // driver failure without any write ahead log to recover pre-failure data.        new BlockRDD[T](ssc.sc, Array.empty)      } else {        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream        // for this batch        val receiverTracker = ssc.scheduler.receiverTracker        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)        // Register the input blocks information into InputInfoTracker        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)        // Create the BlockRDD        createBlockRDD(validTime, blockInfos)      }    }    Some(blockRDD)  }
View Code

生成BlockRDD后,返回递归上层(FlatMappedDStream中)继续执行,上层代码如下:

override def compute(validTime: Time): Option[RDD[U]] = {    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))  }

其从parrent.getOrCompute(SocketInputDStream.getOrCompute)返回后将进行RDD的转换(生成RDD Graph的过程),执行完成会再返回递归上层进行RDD转换, 直至回到调用入口ForEachDStream(outputStream)中。

outputStream.generateJob(time)【该方法在ForEachDStream中实现】会使用foreachFunc方法(DStream.print中定义)及当前批次创建Job.

         当创建完Job后,JobGenerator.generateJobs会使用jobScheduler.submitJobSet提交作业。

SubmitJobSet具体实现如下:

def submitJobSet(jobSet: JobSet) {    if (jobSet.jobs.isEmpty) {      logInfo("No jobs added for time " + jobSet.time)    } else {      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))      jobSets.put(jobSet.time, jobSet)      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))      logInfo("Added jobs for time " + jobSet.time)    }  }
View Code

最终会将JobSet中的job,使用jobExecutor线程池以多线程方式使用JobHandler进行处理。

private class JobHandler(job: Job) extends Runnable with Logging {    import JobScheduler._    def run() {      try {        val formattedTime = UIUtils.formatBatchTime(          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)        val batchUrl = s"/streaming/batchid=${job.time.milliseconds}"        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"        ssc.sc.setJobDescription(          s"""Streaming job from $batchLinkText""")        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)        // We need to assign `eventLoop` to a temp variable. Otherwise, because        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then        // it's possible that when `post` is called, `eventLoop` happens to null.        var _eventLoop = eventLoop        if (_eventLoop != null) {          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))          // Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details.          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            job.run()          }          _eventLoop = eventLoop          if (_eventLoop != null) {            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))          }        } else {          // JobScheduler has been stopped.        }      } finally {        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)      }    }  }}
View Code

 

上边代码为JobHandler核心代码。其将向EventLop发送一JobStarted事件,及调用Job.run()方法。

def run() {    _result = Try(func())  }

其中job.run()方法会执行生成job时的函数foreachFunc。foreachFunc中的take操作为action操作会触发作业提交,从而完成数据处理操作。

 

本文地址:                       

 

转载于:https://www.cnblogs.com/barrenlake/p/4889190.html

你可能感兴趣的文章
零散笔记
查看>>
学 Win32 汇编[22] - 逻辑运算指令: AND、OR、XOR、NOT、TEST
查看>>
MaiN
查看>>
[Python学习] 简单网络爬虫抓取博客文章及思想介绍
查看>>
触发器课程SQL Server 知识梳理九 触发器的使用
查看>>
信息浏览器从Android的浏览器中传递cookie数据到App中信息浏览器
查看>>
客户端连接linux虚拟机集群报错
查看>>
linux下部署一个JavaEE项目的简单步骤
查看>>
hash储存机制
查看>>
[Android学习系列16]Android把php输出的json加载到listview
查看>>
20145205 《信息安全系统设计基础》第14周学习总结
查看>>
XML中CDATA和#PCDATA的区别
查看>>
6)添加一个窗口的图标
查看>>
SQL SERVER的锁机制(二)——概述(锁的兼容性与可以锁定的资源)
查看>>
POJ - 1422 Air Raid 二分图最大匹配
查看>>
Road Map
查看>>
正则替换中的一个Bug
查看>>
HI3531uboot开机画面 分类: arm-linux-Ubunt...
查看>>
制作U盘启动CDLinux 分类: 生活百科 ...
查看>>
leetcode——Best Time to Buy and Sell Stock
查看>>