Extract Streaming Data with Apache Flume from Twitter to HDFS

Today we are going to do something exciting. We will use another tool from Hadoop ecosystem and play with Real-Time Streaming data. We will use Apache Flume to connect to our social media networking site – Twitter and download data related to our topics and store it in Hadoop File system (HDFS) for further processing.


Before we proceed to our demo, let’s check some basics.


What is Apache Flume?

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.


Flume Architecture

It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Source: flume.apache.org


Identifying our Source, Channel and Sinks

For our case of extracting Streaming Data with Apache Flume from Twitter to HDFS, we will have the source as Twitter Site, Channel as Memory and sink or destination as HDFS.


Source: Create a Twitter App

In order to download posts/tweets data from Twitter we need to have an app. This is very simple, follow these steps to create one.

  • Go to apps.twitter.com. Signup and create a twitter developer account.

  • Once you are on Developer Portal under Projects & Apps, Scroll down to Stand-alone applications and click ‘+Create App’ button.

  • Enter a unique app name and click next.

  • On Next page, you will find some API keys. You will find API key and API Key Secret on this page, these are your consumer key and consumer access token. Copy and note these down for later use. Move to next page by clicking App Settings.

  • On this page, click on Keys and Tokens on the top.

  • On this page, under Authentication Tokens, click and generate access key and token.

  • Copy your Access token and access token secret from this screen.

Now your source app is ready.


Sink: HDFS location

Create an HDFS location which we need to mention in flume configuration file. I created a directory flume_tweets: /user/ad/ flume_tweets


Flume configuration file

Flume configuration file is the most important part of the configuration. We define and configure all our sources, channels, and sinks within this document. We also mentioned all the keywords/topics about which we want to extract data from Twitter. Let’s go through the configuration file that we will use in this case.

vi flume_twitter.conf
#we are using ‘TwitterAgent’ as the Flume Agent Name
#Define/Name the source, channel & sink.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

#Configure Source
#We had earlier created an app on twitter and got Twitter ‘consumer key’, ‘consumer secret’, ‘access token’ and ‘access token secret’ details. Now, enter this information here. This allows the agent to connect to app and get desired data.
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey =CONSUMER KEY
TwitterAgent.sources.Twitter.consumerSecret =CONSUMER SECRET
TwitterAgent.sources.Twitter.accessToken =ACCESS TOKEN
TwitterAgent.sources.Twitter.accessTokenSecret =ACCESS TOKEN SECRET

#Enter all your keywords/topics for which you want to download tweets data.
TwitterAgent.sources.Twitter.keywords = spark,scientist,hadoop,big data,analytics,bigdata,cloudera,data science,data scientist,business intelligence,mapreduce,data warehouse,data warehousing,mahout,hbase,nosql,newsql,businessintelligence,cloudcomputing

#Configure Sink
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
#enter Location on HDFS where data will be downloaded
TwitterAgent.sinks.HDFS.hdfs.path = /user/ad/flume_tweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

#channel configuration
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000

Our configuration file is now ready.


How to run Flume agent?

We can run flume agent using ‘flume-ng agent’ command. Following are the parameters we need to specify:

  • -n - Name of the agent

  • -f - configuration file for agent. This configuration file holds config w.r.t. source, channel and sink.

Let’s run our flume agent now to extract the data from twitter to HDFS:

flume-ng agent -n AgentName -c conf -f /mnt/home/ad/flume_twitter.conf

Flume agent will now connect to Twitter app and start processing posts/tweets/document data which includes your keywords and will store them in a file within HDFS directory.


You may check the file at HDFS location.

hdfs dfs -ls /user/ad/flume_tweets

Result:

[ad@servername flume]$ hdfs dfs -ls /user/ad/flume_tweets
Found 1 items
-rw-r--r-- 3 ad hadoop  48553511 2021-07-31 18:56 /user/ad/flume_tweets/FlumeData.1627757575015

You will find a .tmp file as the file is still getting data feeds. For example ‘FlumeData.1627757575015.tmp’.


You can check the contents of file. The file is in Avro format. You may view it in binary format as well as text using Hue.


Binary:

Text:

Once you stop flume agent (use CTRL +C to stop it), the file will change to ‘FlumeData.1627757575015’

[ad@servername flume]$ hdfs dfs -ls /user/ad/flume_tweets
Found 1 items
-rw-r--r-- 3 ad hadoop  48553511 2021-07-31 18:56 /user/ad/flume_tweets/FlumeData.1627757575015

You have read-time streaming data getting generated at you Hadoop location. You may analyze this data to get further insights. For continuous analysis you may not want to stop your flume agent.


I hope you had a great time learning today.

Happy Learning!!