top of page

Flume & Spark Streaming Integration

Updated: Aug 21, 2021

In this post, we will integrate Flume and Spark Streaming. We will use Spark Streaming to process data that we will get from Flume. Flume will be our source for streaming data.


Flume is considered to be to most suitable to have source from log files. We will get log file streaming data using flume and process it by selecting only lines from log data where date time is there. Let’s get started.


Flume Stream Data – Input for Spark Streaming

Input data is log files generated from commands in Linux and placed at sink which is HDFS location. We will create file in HDFS location with timestamp.


The content of event log file are in following log format:


Prepare flume configuration file

vi stlog.conf
# Name the components on this agent
stlog.sources = r1
stlog.sinks = k1
stlog.channels = c1

# Describe/configure the source
stlog.sources.r1.type = exec
stlog.sources.r1.command = tail -F -s 2 /mnt/home/ad/rtp/ssc/event2.log

# Describe the sink
stlog.sinks.k1.type = hdfs
stlog.sinks.k1.hdfs.path = /user/ad/rtp/ssc/%Y-%m-%d/
stlog.sinks.k1.hdfs.filePrefix = events-
stlog.sinks.k1.hdfs.fileSuffix = .log
stlog.sinks.k1.hdfs.useLocalTimeStamp = true
stlog.sinks.k1.hdfs.fileType = DataStream

# Use a channel c1 buffers events in memory
stlog.channels.c1.type = memory
stlog.channels.c1.capacity = 1000
stlog.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
stlog.sources.r1.channels = c1
stlog.sinks.k1.channel = c1

Here,

  • tail -F will read last 10 lines and will not end but will wait to read any new last 10 lines that are added to the source file. This will end only by terminating it by Ctrl + C thus creating a continuous stream of data.

  • -s 2 is to sleep for 2 seconds.


Once file is ready, We will run the Flume agent

flume-ng agent -n stlog -f /mnt/home/ad/rtp/ssc /stlog.conf - Dflume.root.logger=INFO,console

Output:

Once agent run completes, you can find log files created at HDFS location with timestamp in filename.

Now we have our file stream data is fetched by flume and is ready to be processed by Spark Streaming. Flume agent will also be running continuously until you stop the agent.


Streaming data from Flume and processing using Spark Streaming Code

We now need to pick up the stream data from HDFS location generated by Flume and use our Spark Streaming code to process the data.


Let’s create a sbt package. For this create the sbt expected directory structure.

cd /mnt/home/ad /rtp/ssc/
mkdir sbt
cd sbt
mkdir src
mkdir src/main
mkdir src/main/scala

Now let’s create the build file – build.sbt. Add the following content.

vi build.sbt
name                   := "SparkFlumeProject"
version                := "1.0"
organization           := "DataGuy"
scalaVersion           := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
resolvers += Resolver.mavenLocal           

Spark Streaming Code

Let’s write our code for processing the data. We have read an event log file from local file system in Linux and moved to HDFS with name ‘event-localtimestamp.log’ within ‘yyyy-mm-dd’ folder. We will use regular expressions to identify logs with timestamp from our file contents and then will save them to another file on local file system on Linux.

vi LogParser.scala
import scala.util.matching.Regex
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.Encoders
import org.apache.spark.ml._

case class LogEvent(timestamp:String, etype:String, mesage:String)

object LogParser {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Logparser Streaming")
    val ssc = new StreamingContext(conf, Seconds(4))

    // define the regular expression - <date-time> <log-type> <message>
    val regex = raw"^([\d/]+ [\d:]+) ([a-zA-Z]+) (.*)".r
    //read data
     val lines = ssc.textFileStream("/user/ad/rtp/ssc/")
    
    // apply regex adn create a case class from filter matches
    val df = lines.map(regex.findAllIn(_)).filter(_.isEmpty == false).map(matches => LogEvent(matches.group(1), matches.group(2), matches.group(3)))
    df.print(4)
    ssc.start()
    ssc.awaitTermination()
  }
}

Place this file within src/main/scala/ folder.

And then, let’s package our files into jar:

cd /mnt/home/ad/ rtp/ssc/sbt/ 
sbt package

You will find the jar file created as ‘sparkflumeproject_2.11-1.0.jar’

ls /mnt/home/ad /rtp/ssc/sbt/target/scala-2.11/

Check the jar file contents to check for LogParser class

jar tf /mnt/home/ad/rtp/ssc/sbt/target/scala-2.11/sparkflumeproject_2.11-1.0.jar

We will now submit Spark Streaming code:

spark2-submit --class LogParser --deploy-mode client target/scala-2.11/sparkflumeproject_2.11-1.0.jar > /mnt/home/ad/rtp/ssc/event2.log  2>&1

The code will keep running until you stop it. This will also replace the file you read initially and thus the cycle will go on forming a stream of data.

We can check the content of the data processed and placed at /mnt/home/ad/rtp/ssc/event2.log

cat /mnt/home/ad/rtp/ssc/event2.log

Wonderful! We can see the log data have been processed and moved to new file. You may now stop both flume agent and Spark streaming. In real world, though you will let it run so that any new content in file that lands can be moved from Local file system to HDFS using Flume and then processed by Spark streaming and placed at destination local file system again.


Hope you enjoyed the data processing.

Happy Learning folks!! :)

bottom of page