For really understanding whats going on, always check the code itself :). In particular: Receiver.scala, ReceiverSupervisorImpl.scala, DStream.scala, RawInputDStream.scala, ForEachDStream.scala.
Note 1: that the code snippets here are pedagogical : experiments to get a working spark streaming pipeline - in production you definetly would want to clean some of this stuff up (print statements, the way the Thread is implemented, and so on).
How many RDDs are created, and when ?
One obvious thing you need to do to prevent starvation or drowning in data, is calibrate the windowing of RDDs. For a stream, every n seconds, a new RDD is created. That is part of the fundamental contract for a receiver.
This is known as the "slide duration".
I've doctored up the databricks twitter application (it didn't work out of the box for me), here . The first thing I decided to play with was the slide duration implementation, to see when it was called, and how often.
![]() |
| Note that the @transient variable annotation is here. A good exersize for spark newcomers will be to determine why. Hint : Look at the API Docs for the DStream, and ask yourself how spark |
Where in the code do RDDs information contents get populated ?
The way RDD's are created is that you call the "store(...)" function. From The spark streaming programming guide..
What happens when we store() something ?
The org.apache.spark.streaming.receiver.Receiver class defines a contract for a receiver. It also defines the "store(...)" function, which literally just stores objects into a memory buffer, which asynchronously will fill up spark's distributed memory store. For a rough sketch of the communication loop, check the top.
Why a store function? According to the spark docs, this is a new feature which allows us to abstract our stream handlers from being coupled to the RDDBlock API. This store function must be called from a thread which is launched in your onStart() implementation.
Why a thread? Because the onStart(..) method needs to return in order for a RDD to begin streaming. Otherwise, you get starvation. To get my twitter stream working, I noticed that I had to return from the onStart() method, so that the streaming, and calls to store(...), were occuring asynchronously.
3) How do you process the RDDs ? The simplest thing to do is to call a forEach on the RDD stream. However, there are some operations (like rdd.count) which can cause starvation if you don't have enough workers.
Make sure you have more than one worker when running spark streaming in local mode! You do this with : setMaster("local[2]"), for example.
In any case, you process the RDDs using an rdd.forEach block, like this. I think this is funny and probably bad that spark can't detect the fact that this code is broken and will never do anything useful. Lets see if we can find a way to fix it, see SPARK-4040 for details.
4) What if I don't process any RDDs in the stream... where do they go ?
In this case, the spark context itself should fail automatically: For a stream to start you have to register at least one listener which consumes the RDDs being produced.


It was really a nice post and i was really impressed by reading this Big Data Hadoop Online Course Bangalore
ReplyDelete