Few weeks back we had used flume to download tweets from twitter on the topic that we wanted (I had used BigData world keywords).
Today, we will level up and analyze the tweets and determine sentiments of tweets. We can do our own analysis on what is trending in the world. Let’s say if there is a new policy in place in a country, we can analyze the tweets and check whether majority of people are happy or unhappy with the decision. We will use Spark Streaming to do this.
Pre-requisites:
You need your Twitter developer account and the API & Authentication token and secret keys. In case you don’t have, get them created. Follow my earlier post to do this. https://www.databare.com/post/extract-streaming-data-with-apache-flume-from-twitter-to-hdfs
We would use AFINN for sentiment analysis. I understand you don’t know what AFINN is. Well “AFINN is an English word listed developed by Finn Årup Nielsen. Words scores range from minus five (negative) to plus five (positive). The English language dictionary consists of 2,477 coded words.” For further reading check out this post from Neal Caren: https://nealcaren.org/lessons/wordlists/
You would need to download this wordlist from: https://www2.imm.dtu.dk/pubdb/views/edoc_download.php/6010/zip/imm6010.zip
Extract it and use AFINN-111.txt. You would need to upload it to a HDFS
location.
Spark Streaming Code for Twitter analysis
Now let’s prepare our code for twitter analysis. We will start with our sbt directory structure and build.sbt file defining all dependencies.
mkdir sbt
cd sbt
mkdir src
mkdir src/main
mkdir src/main/scala
Create build.sbt and add the following content:
vi build.sbt
name := "SparkMe Project"
version := "1.0"
organization := "DataGuy"
scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-streaming" % sparkVersion
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.0"
resolvers += Resolver.mavenLocal
Tweets Analysis Code
Let’s create our program and we will upload this to src/main/scala location. Make sure you update the AFINN file location.
vi TwitterSentiments.scala
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
object TwitterSentiments {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: TwitterSentiments <consumer key> <consumer secret> " +
"<access token> <access token secret> [<filters>]")
System.exit(1)
}
// Set logging level if log4j not configured (override by adding log4j.properties to classpath)
if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
Logger.getRootLogger.setLevel(Level.WARN)
}
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by Twitter stream
// can use them to generate OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
// check Spark configuration for master URL, set it to local if not configured
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}
val ssc = new StreamingContext(sparkConf, Seconds(2))
// change log level to warning
ssc.sparkContext.setLogLevel("WARN")
val stream = TwitterUtils.createStream(ssc, None, filters)
val textElems = stream.map(status => status.getText)
textElems.print()
val hashTags =textElems.flatMap(_.split(" "))
// Read in the word-sentiment list and create a static RDD from it
val wordSentimentFilePath = "/user/ad/rtp/ssc/twitterSentiment/AFINN-111.txt"
val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
val Array(word, happinessValue) = line.split("\t")
(word, happinessValue.toInt)
}.cache()
// Determine the hash tags with the highest sentiment values by joining the streaming RDD
// with the static RDD inside the transform() method and then multiplying
// the frequency of the hash tag by its sentiment value
val happiest10 = hashTags.map(hashTag => (hashTag, 1))
.reduceByKeyAndWindow(_ + _, Seconds(10))
.transform{topicCount => wordSentiments.join(topicCount)}
.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
.map{case (topic, happinessValue) => (happinessValue, topic)}
.transform(_.sortByKey(false))
happiest10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
})
ssc.start()
ssc.awaitTermination()
}
}
Place this file within src/main/scala/ folder, and then compile and run sbt package. Run following commands one after other:
sbt compile
sbt package
Once done you will find the jar file created at: target/scala-2.11/sparkme-project_2.11-1.0.jar
Running Analysis
We will now submit Spark Streaming code in following format:
spark2-submit --class TwitterSentiments --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0 --deploy-mode client target/scala-2.11/sparkme-project_2.11-1.0.jar <consumer key> <consumer secret> <access token> <access token secret> “keyword”
Let’s analyze some cricket tweets:
spark2-submit --class TwitterSentiments --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0 --deploy-mode client /mnt/home/ad/rtp/ssc/twitterSentiment/target/scala-2.11/sparkme-project_2.11-1.0.jar xxxxxxx xxxxxxx xxxxxxx xxxxxxx "cricket"
We will see following data on console:
Since this is streaming data, we analyzed the sentiments every 10 seconds.
Here you can see, AFINN scored happiness and gave positive score and it considered fire as negative word. Here, we have seen that AFINN only goes by words and thus the analysis may not be 100% accurate.
Now that you know how to get trends and analyze sentiments, do try it yourself and share your experience.
Hope today’s learning was useful as well as fun.
Happy Learning folks!!
Comentários