Twitter Sentiments Analysis

Few weeks back we had used flume to download tweets from twitter on the topic that we wanted (I had used BigData world keywords).

Today, we will level up and analyze the tweets and determine sentiments of tweets. We can do our own analysis on what is trending in the world. Let’s say if there is a new policy in place in a country, we can analyze the tweets and check whether majority of people are happy or unhappy with the decision. We will use Spark Streaming to do this.


Extract it and use AFINN-111.txt. You would need to upload it to a HDFS


Spark Streaming Code for Twitter analysis

Now let’s prepare our code for twitter analysis. We will start with our sbt directory structure and build.sbt file defining all dependencies.

mkdir sbt
cd sbt
mkdir src
mkdir src/main
mkdir src/main/scala

Create build.sbt and add the following content:

vi build.sbt
name         := "SparkMe Project"
version      := "1.0"
organization := "DataGuy"
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-streaming" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.0"
resolvers += Resolver.mavenLocal

Tweets Analysis Code

Let’s create our program and we will upload this to src/main/scala location. Make sure you update the AFINN file location.

vi TwitterSentiments.scala 
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils

object TwitterSentiments {
  def main(args: Array[String]) {
    if (args.length < 4) {
     System.err.println("Usage: TwitterSentiments <consumer key> <consumer secret> " +
       "<access token> <access token secret> [<filters>]")

    // Set logging level if log4j not configured (override by adding to classpath)
    if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {

    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)

    // Set the system properties so that Twitter4j library used by Twitter stream
    // can use them to generate OAuth credentials
   System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
   System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
   System.setProperty("twitter4j.oauth.accessToken", accessToken)
   System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")

    // check Spark configuration for master URL, set it to local if not configured
    if (!sparkConf.contains("spark.master")) {

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // change log level to warning

    val stream = TwitterUtils.createStream(ssc, None, filters)
    val textElems = => status.getText)
    val hashTags =textElems.flatMap(_.split(" "))
    // Read in the word-sentiment list and create a static RDD from it
    val wordSentimentFilePath = "/user/ad/rtp/ssc/twitterSentiment/AFINN-111.txt"
    val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
      val Array(word, happinessValue) = line.split("\t")
      (word, happinessValue.toInt)

    // Determine the hash tags with the highest sentiment values by joining the streaming RDD
    // with the static RDD inside the transform() method and then multiplying
    // the frequency of the hash tag by its sentiment value

    val happiest10 = => (hashTag, 1))
     .reduceByKeyAndWindow(_ + _, Seconds(10))
     .transform{topicCount => wordSentiments.join(topicCount)}
      .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
      .map{case (topic, happinessValue) => (happinessValue, topic)}

   happiest10.foreachRDD(rdd => {
      val topList = rdd.take(10)
     println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
     topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}


Place this file within src/main/scala/ folder, and then compile and run sbt package. Run following commands one after other:

sbt compile
sbt package                              

Once done you will find the jar file created at: target/scala-2.11/sparkme-project_2.11-1.0.jar

Running Analysis

We will now submit Spark Streaming code in following format:

spark2-submit --class TwitterSentiments --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0 --deploy-mode client target/scala-2.11/sparkme-project_2.11-1.0.jar <consumer key> <consumer secret> <access token> <access token secret> “keyword”

Let’s analyze some cricket tweets:

spark2-submit --class TwitterSentiments --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0 --deploy-mode client /mnt/home/ad/rtp/ssc/twitterSentiment/target/scala-2.11/sparkme-project_2.11-1.0.jar xxxxxxx xxxxxxx xxxxxxx xxxxxxx "cricket"

We will see following data on console:

Since this is streaming data, we analyzed the sentiments every 10 seconds.

Here you can see, AFINN scored happiness and gave positive score and it considered fire as negative word. Here, we have seen that AFINN only goes by words and thus the analysis may not be 100% accurate.

Now that you know how to get trends and analyze sentiments, do try it yourself and share your experience.

Hope today’s learning was useful as well as fun.

Happy Learning folks!!