SparkStreaming 分析 (基于1.5版本源码)
SparkStreaming 介绍
SparkStreaming是一个流式批处理框架,它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景。其处理流程如下:
1、 接收实时流数据并持久化
2、 将实时流以时间片切分成多个批次
3、 将每块(一个批次)的数据做为RDD,并用RDD操作处理数据
4、 每块数据生成一个SparkJob,提交Spark进行处理,并返回结果
Dstream 介绍
Spark Streaming中一个关键的程序抽象,表示从数据源获取持续性的数据流以及经过转换后的数据流。DStream由持续的RDD序列组成 :
作用于DStream上的操作有两种(与RDD类似):Transformation与Output。
DStream之间的的转换所形成的依赖关系保存在DStreamGraph中(DstreamGraph在StreamingContext创建时初始化),DStreamGraph会定期生成RDD DAG.
SparkStreaming应用
SparkStreaming应用程序,以WordCount为例,实现如下:
val conf = new SparkConf().setAppName("wordCount").setMaster("local[4]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) val lines = ssc.socketTextStream("localhost", 8585, StorageLevel.MEMORY_ONLY) val words = lines.flatMap(_.split(" ")).map(w => (w,1)) val wordCount = words.reduceByKey(_+_) wordCount.print ssc.start()
Spark Streaming 执行过程分析
Spark Streaming执行过程,将依托第一部分应用程序(WordCount)进行分析
StreamingContext初始化过程
StreamingContext是很多Streaming功能的入口,如:它提供从多种数据源创建DStream的方法等。由上述WordCount应用可知,Streaming应用执行时首先会创建StreamingContext。
伴随StreamingContext的创建将会创建如下主要组件:
1、创建DStreamGraph,并为其设置转换成RDD Graph的时间间隔。
private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { require(batchDur_ != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph } }
2、创建JobScheduler
private[streaming] val scheduler = new JobScheduler(this)
DStream创建及转换
利用刚刚创建的StreamingContext通过调用socketTextStream方法创建SocketInputDStream(ReceiverInputDstream).
InputDStream继承体系如下:(以SocketInputDStream与KafkaInputDStream为例)。
JAVA中初始化子类时,会先初始化其父类。所以在创建SocketInputDStream时,会先初始化InputDStream,在InputDStream中实现将自身加入DStreamGraph中(具体见上图)。
InputDStream子类都有一个getReceiver方法, 此方法用来获取Receiver对象. 以SocketInputDStream为例, 如上图, 其会创建SocketReceiver来接收数据.
DStream中算子的转换,类似于RDD中的转换,都是延迟计算。当上述应用遇到print--Output算子时,会将DStream转换为ForEachDStream,并调register方法作为OutputStream注册到DStreamGraph的outputStreams列表.
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() }
其中ForEachDStream不同于其它DStream的地方是其重载了generateJob方法。
所有DStream之间的转换关系,使用类似RDD的依赖来表示。
启动过程
应用程序通过调用ssc.start()方法,开始执行stream应用的执行.start方法具体实具如下所示:
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
其中, 最核心代码为scheduler.start, scheduler为JobScheduler对象. 如上文所述其在StreamingContext实例化时创建. (下面将按照函数调用流程进行分析)
JobScheduler创建及执行
JobScheuler用来调度运行在Spark上的作业, 它使用JobGenerator生成jobs, 然后使用一个线程池并行运行提交作业.
一. JobScheduler 创建:
JobScheduler由StreamingContext创建,并触发start调用.
JobScheduler初始化时,会创建一个ThreadPool(jobExecutor)和jobGenerator
其中:
jobExecutor用于提交作业.ThreadPool中线程的数量为Job并发量,由”spark.streaming.concurrentJobs”指定,默认为1.
JobGenerator为JobGenerator类实例.其用于依据DStreams生成jobs。
二. JobScheduler执行
start方法执行时会创建并启动以下服务:
eventLoop: EventLoop[JobSchedulerEvent]对象,用以接收和处理事件。调用者通过调用其post方法向事件队列注册事件。EventLoop开始执行时,会开启一deamon线程用于处理队列中的事件。EventLoop是一个抽象类,JobScheduler中初始化EventLoop时实现了其OnReceive方法。该方法中指定接收的事件由processEvent(event)方法处理。
receiverTracker: ReceiverTracker对象,用以管理ReceiverInputDStream中receiver的执行。这个对象必须在所有InputStream添加至DStreamGraph中后创建。因其实例化时会从DStreamGraph中抽取InputDStream. 以便用在其启动时抽取其中的Receiver。
jobGenertor:其在JobScheduler实例化时创建,在此处进行启动。
此部分代码实现,如下图所示:
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
JobScheduler启动时,主要实例化并启动上述组件,下面将整个过程分为两大块并按组件进行介绍。
1、数据接收过程(启动Receiver, 接收数据, 生成Block)
ReceiverTracker
在JobScheduler中创建并调用其start方法。
一、JobScheduler创建
JobScheduler创建时会创建以下主要属性:
receiverInputStream:接收数据的InputDStream实例,通过ssc.graph.getReceiverInputStreams获取,其内部存放ReceiverInputDStream实例及子类实例(包括:SocketInputDStream, RawInputDStream,FlumePollingInputDStream, KafkaInputDStream, MQTTInputDStream)
receiverdBlockTracker: ReceivedBlockTracker实例,用来记录Receiver接收的blocks。通过此类进行的操作可以保存一个WAL(write ahead log), 以便失败后进行保存
schedulingPolocy: ReceiverSchedulingPolicy实例,用于调度Receiver.
receiverTrackingInfo: HashMap对象, 用于维护receivers信息,key: receiverId, value: receiver info. 只能由ReceiverTrackerEndpoint访问。
二、start方法执行
start方法被调用时,首先其会判断receiverInputStream是否为空,如果为空,也就是没有任何Receiver,不做任何操作。当不为空时,其会创建ReceiverTrackerEndpoint实例endpoint.
ReceiverTrackerEndpoint 为RPC终端,用于接收各Receiver发送的消息并进行相应处理,必要时给予响应。
如果第一次启动,则默认不跳过启动Receiver(默认:skipReceiverLaunch = false,创建ReceiverTracker时默认指定),则执行launchReceivers()方法。
这部分代码如下图所示:
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }
其中launchReceivers方法,将调用runDummySparkJob()方法和向ReceiverTrackerEndpoint发送StartAllReceivers消息。
/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) }
runDummySparkJob()方法,用来确认slave节点是否注册,避免将所有receivers分发到同一节点。其通过执行一非常简单的任务,让SparkCore执行一次,然后通过其组件信息判断是否有除driver之外的Executor存在。runDummySparkJob代码如下:
/** * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the * receivers to be scheduled on the same node. * * TODO Should poll the executor number and wait for executors according to * "spark.scheduler.minRegisteredResourcesRatio" and * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job. */ private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } assert(getExecutors.nonEmpty) }
检查完slave注册情况后,其会从receiverInputstreams列表中抽取所有Receivers,并用其创建StartAllReceivers消息发送给endpoint.
ReceiverTrackerEndpoint
用于接收Receiver及其自身发送的消息并进行处理,必要时进行响应。其可处理的消息有:
StartAllReceivers,、RestartReceivers、CleanupOldBlocks、UpdateReceiverRateLimit、ReportError以及RegisterReceiver、AddBlock、DeregisterReceiver、AllReceiverIds、StopAllReceivers.
当ReceiverTrackerEndpoint接收到StartAllReceivers时,其会通过调度策略shedulingPolicy(前文已描述) 生成Receiver分发策略(挑选出可运行Receiver的Executor),然后通过调用startReceiver进行启动。
case StartAllReceivers(receivers) => val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { val executors = scheduledExecutors(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) }
此部分代码实现,如上所示。其中schedulingPolicy与startReceiver将在下小节进行说明。
ReceiverSchedulingPolicy 调度策略
此类用来,调度receivers, 并保证其均匀分布。Receiver调度分为两个阶段:一是全局调度,当ReceiverTracker启动时, 通过scheduleReceivers同时调度所有reciver;二是局部调度,当某个Receiver重启时发生,通过scheduleReceiver进行调度。此处接上小节,接着进行scheduleReceivers的解析:
在ReceiverTrackerEndpoint通过调用schedulingPolicy.scheduleReceivers(receivers, getExecutor)来触发Receivers调度, 其中包含两个参数: receivers和getExecutor. 第一个参数receivers以消息的形式进行传递;第二个参数代表的是可进行调度的Executor列表,用getExecutor方法进行获取。获取的Exectutor不包含与Driver在同一个主机上的节点。其中Executor信息格式为“host:port”
scheduleReceivers在执行时,首先将从executors列表转换成Map格式,单个元素host->”host:port”格式。 然后遍历receivers列表,为其逐个分配节点,分配过程如下:
从列表取出一个receiver,判断其是否具有preferredLocation.(可优先选择机器) 此方法在Receiver基类中声明,要求子类进行实现。上文举列中SocketReceiver未重写此方法,因此不具preferredLocation,因此未执行Receiver的Executor节点可以随意选取。
此部分代码如下
/** * Try our best to schedule receivers with evenly distributed. However, if the * `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly * because we have to respect them. * * Here is the approach to schedule executors: **
* * This method is called when we start to launch receivers at the first time. */ def scheduleReceivers( receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = { if (receivers.isEmpty) { return Map.empty } if (executors.isEmpty) { return receivers.map(_.streamId -> Seq.empty).toMap } val hostToExecutors = executors.groupBy(_.split(":")(0)) val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String]) val numReceiversOnExecutor = mutable.HashMap[String, Int]() // Set the initial value to 0 executors.foreach(e => numReceiversOnExecutor(e) = 0) // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation", // we need to make sure the "preferredLocation" is in the candidate scheduled executor list. for (i <- 0 until receivers.length) { // Note: preferredLocation is host but executors are host:port receivers(i).preferredLocation.foreach { host => hostToExecutors.get(host) match { case Some(executorsOnHost) => // preferredLocation is a known host. Select an executor that has the least receivers in // this host val leastScheduledExecutor = executorsOnHost.minBy(executor => numReceiversOnExecutor(executor)) scheduledExecutors(i) += leastScheduledExecutor numReceiversOnExecutor(leastScheduledExecutor) = numReceiversOnExecutor(leastScheduledExecutor) + 1 case None => // preferredLocation is an unknown host. // Note: There are two cases: // 1. This executor is not up. But it may be up later. // 2. This executor is dead, or it's not a host in the cluster. // Currently, simply add host to the scheduled executors. scheduledExecutors(i) += host } } } // For those receivers that don't have preferredLocation, make sure we assign at least one // executor to them. for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) { // Select the executor that has the least receivers val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2) scheduledExecutorsForOneReceiver += leastScheduledExecutor numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1 } // Assign idle executors to receivers that have less executors val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1) for (executor <- idleExecutors) { // Assign an idle executor to the receiver that has least candidate executors. val leastScheduledExecutors = scheduledExecutors.minBy(_.size) leastScheduledExecutors += executor } receivers.map(_.streamId).zip(scheduledExecutors).toMap }- First, schedule all the receivers with preferred locations (hosts), evenly among the * executors running on those host.<> *
- Then, schedule all other receivers evenly among all the executors such that overall * distribution over all the receivers is even.<> *
当挑选出要执行的Executor后,调用startReceiver(receiver, executor)方法, 在指定executor是启动receiver.
startReceiver方法
将Receiver使用RDD进行包装,然后使用SparkContext.submitJob方法进行提交,使其巧妙的以普通RDD作业运行的方式将Receiver分发在选出的Executor方法上执行, 其执行逻辑为startReceiverFunc操作。当startReceiverFunc方法被调用时,会为分发到该节点中的receiver创建ReceiverSupervisorImpl对够象supervisor。 然后调用supervisor的start方法,使其监控运行在worker中的receiver。ReceiverSupervisor提供了处理接收数据的必要接口。这部分的具体实现如下代码所示:
// Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledExecutors to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledExecutors.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors)) } receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") }
当receiver执行起来后ReceiverTracker等待程序结束。接下来介绍ReceiverSupervisorImpl
ReceiverSupervisor
用监控运行在worker中的receiver,提供了处理接收数据的必要接口,其start方法执行逻辑如下:
/** Start the supervisor */ def start() { onStart() startReceiver() }
如上代码所述,其将执行onStart()及startReceiver()两个方法, onStart由其实现类ReceiverSupervisor类实现。
其中OnStart方法中会让所有注册的BlockGenerators执行, BlockGenerator的作用是定时使用接收的数据生成Block,并将生成的block加入队列。下一小节具体说明BlockGenerators。
startReceiver方法用于启动Receiver, 实现逻辑如下所述:
/** Start receiver */ def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { logInfo("Starting receiver") receiverState = Started receiver.onStart() logInfo("Called receiver onStart") } else { // The driver refused us stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) } } catch { case NonFatal(t) => stop("Error starting receiver " + streamId, Some(t)) } }
其中的onReceiverStart由ReceiverSupervisorImpl实现, 其作用主要是:向ReceiverTracker发送注册消息(RegisterReceiver),并等待响应(成功:true; 否则:false.)。
方法通过调用Receiver(SocketReceiver)中的onStart 使用创建deamon线程并开启,该线程用于真实receive数据。其中onStart方法具体实现如下图所示:def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() }
/** Create a socket connection and receive data until receiver is stopped */ def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } }}
BlockGenerator
上节所述:ReceiverSupervisor调用start时会执行OnStart。OnStart方法中会让所有注册的BlockGenerators执行(调用start方法), BlockGenerator的作用是定时使用接收的数据生成Block,并将生成的block加入队列。下面将对其进行分析:
BlockGenerators重要属性:
blockIntervalTimer: RecurringTimer对象, 其做用创建daemon线程, 定时执行第三个参数callback传入函数. 此处为updateCurrentBuffer函数。
blockPushingThread: 用于执行keepPushingBlocks的线程,其中blocksForPushing类型为ArrayBlockingQueue.
当调用其start()方法时,其会启动调用blockIntervalTimer的start方法及启动blockPushingThread线程。
blockIntervalTimer的start方法被调用时,它将启动后台线程,定时执行updateCurrentBuffer操作,该操作将缓存中的数据,包装成block, 加入 blocksForPushing.
blockPushingThread线程开启后,会执行keepPushingBlocks() 方法, 其将blocksForPushing中的block信息调用pushBlock方法,这个方法内部触发BlockGeneratorListener的onPushBlock事件。该监听器在ReceiverSupervisorImpl创建BlockGenerator时做为参数传入。
BlockGeneratorListener的onPushBlock事件触发时,其将调用pushArrayBuffer方法,pushArrayBuffer进一步调用pushAndReportBlock方法。PushAndReportBlock方法具体实现如下:
/** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
其将利用receiverdBlockHandler.storeBlock存储block数据, 并向trackerEndpoint发送AddBlock消息。
ReceiverTracker接收到AddBlock信息后,其将信息包装成BlockAdditionEvent写入日志,并且将Block信息加入getReceiverdBlockQueue队列
小结: 以上部分,描述了数据的接收、存储过程。如上文所述,在JobSheduler执行start方法时,还将会启动jobGenerator组件,也就是启动数据处理的过程。下面将对jobGenerator进行处理。
2、数据处理过程(DStream转换成RDD, 生成Jobs并提交)
JobGenerator
重要属性:
Timer: RecurringTimer对象, 与JobScheduler中类似,在该对象中其会定期执行GenerateJobs方法。
在JobGenerator.start()被调用时,其将创建eventLoop对象并启动,以及调用startFirstTime()方法。
其中eventLoop与上文JobScheduler中的一样,其定义事件交由processEvent(event). 此处processEvent实现逻辑为:
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
DStreamGraph.start实现逻辑如下:
def start(time: Time) { this.synchronized { require(zeroTime == null, "DStream graph computation already started") zeroTime = time startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validateAtStart) inputStreams.par.foreach(_.start()) } }
当GenerateJobs事件触发时, processEvent中会执行generateJobs(time)方法, 该方法最终会调用ReceiverBlockTracker.allocateBlocksToBatch(time)将receiverdBlockQueue中未分配的block生成一个批次,并将信息保存在timeToAllocatedBlocks(Hashmap<batchTime, allocatedBlocks>)中,同时记录日志信息。
然后调用graph.generateJobs(time)方法,调用所有outputStream.generateJob(time). 因为OutputStream(举例中为ForEachDStream)会重写DStream中的generateJob方法,此时会调用ForEachDStream中的generateJob.[不要错误的认为是DStream], 其具体实现为
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
其中parent.getOrCompute用于获取指定批次的RDD。此处是调用parent.getOrCompute, 先来看一下WordCount应用中DStream的转换,如下图:
getOrCompute( compute方法与之类似)方法由DStream基类创建, 如果子类重写该方法,则执行子类方法; 若未重写,则执行基类中的方法。getOrCompute方法会进行递归,直至回溯至SocketInputDStream中。但SocketInputDStream中并未重写此方法,所以其将执行基类中方法,代码如下:
/** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
其中通过执行compute方法来生成批定批次的初始RDD(利用接收到的数据生成BlockRDD)。compute方法代码如下:
/** * Generates RDDs with blocks received by the receiver of this stream. */ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
生成BlockRDD后,返回递归上层(FlatMappedDStream中)继续执行,上层代码如下:
override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) }
其从parrent.getOrCompute(SocketInputDStream.getOrCompute)返回后将进行RDD的转换(生成RDD Graph的过程),执行完成会再返回递归上层进行RDD转换, 直至回到调用入口ForEachDStream(outputStream)中。
outputStream.generateJob(time)【该方法在ForEachDStream中实现】会使用foreachFunc方法(DStream.print中定义)及当前批次创建Job.
当创建完Job后,JobGenerator.generateJobs会使用jobScheduler.submitJobSet提交作业。
SubmitJobSet具体实现如下:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batchid=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from $batchLinkText""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } } }}
上边代码为JobHandler核心代码。其将向EventLop发送一JobStarted事件,及调用Job.run()方法。
def run() { _result = Try(func()) }
其中job.run()方法会执行生成job时的函数foreachFunc。foreachFunc中的take操作为action操作会触发作业提交,从而完成数据处理操作。
本文地址: