Spark Streaming: Streaming Word Count

In last post, we used flume from Hadoop toolset to stream data from Twitter to HDFS location for analysis. In this blog, we are going to again process streaming data but will use popular and efficient Spark Streaming for getting data from source and processing it.


Streaming data from HDFS and processing to get word count for files

We will look upon a simple example on Streaming Data from an HDFS folder/location on your Hadoop cluster to Spark where we will perform processing. Let’s get the word count of new files created at the location. We can add new files to the location and get the word count for these files. Let’s get started.


Open Spark Shell with Spark Context.

Spark2-shell

You will find that Spark Context is already available to you as ‘sc’.


Prepare Application

Now, let’s we will write code for our streaming app to stream data from HDFS location.

//import libraries
import org.apache.spark._
import org.apache.spark.streaming._

//App Code
object StreamingWordCount {
def main(args:Array[String]) {

     //Create Streaming Context with 10 seconds batch size
     val ssc = new StreamingContext(sc, Seconds(10))
     //Getting lines from file at HDFS location
     val lines = ssc.textFileStream(args(0))
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
      wordCounts.print()
      
      //Start processing the stream
     ssc.start()
     ssc.awaitTermination()
  }
}

In the above code, we create a StreamingContext in Spark using:

val ssc = new StreamingContext(sc, Seconds(10))

We add seconds (10) to define streaming batch interval as 10 seconds.

In case SparkContext is not already available, use following code to create it.

val sparkConf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))

Note: Make sure there is no spark context running else you will get following error that you can run only one Spark Context in JVM.

org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-XXXX). 

You may change this setting by setting spark.driver.allowMultipleContexts to true.

set spark.driver.allowMultipleContexts = true

Run Application

Once you have your Application Code ready, you may call the main function and pass HDFS location as parameter as under.

 StreamingWordCount.main(Array("/user/ad/RealTimeProcessing/ssc/"))

This will start your streaming application.

Now add files to your mentioned HDFS location. As soon as files are added, you will start getting word count on console running Streaming application.


You may keep adding new files on HDFS location and get word count for each batch. I added a Scala program for got following output.


Here, we have successfully used Spark streaming to read streaming data from HDFS location and processed them to get word count for each batch.


Happy Learning folks!!