# Analysis Tweets from Streaming Socket

Group Project for Big Data Programming, Fall 2017

### Project Contributors:
Caleb Hulburt   
Mohammad Azim   
Yao Jin   
Xian Lai   

========================================================

![](../images/logos.png)

This application focuses on analysing streaming tweets in a distributed manner under the framework of Spark. Spark provides a streaming processing API call SparkStreaming in mini-batch. It chops a continuous stream into discrete chunks with given time interval. Each chunk is saved as an RDD. Then operations are applied on this discrete RDD stream called DStream.

#### Tweepy:
Firstly we use tweepy API to pull tweets streams regard tracks #MAGA and #resist. Then the stream is directed into Spark Streaming through the TCP socket.

#### Spark:
Inside Spark, raw tweets are cleaned into words and then analyzed by following steps:
1. We calculate the term frequency of each word, sort them by frequency and take the top 5000 under each label.
2. We use these 10,000 words as our features and encode the MAGA tweets and resist tweets into structured datasets. Each feature value is either True or False meaning whether this feature has appears in this tweet.
3. We calculate the conditional probabilities of each feature given label.
4. At last we calculate the "informativeness" of features based on previous conditional probabilities.

The benefits using Spark for streaming process:
1. The computation are distributed across machines so it is scalable, thus we can control the latency no matter stream is small or big.
2. The streaming analysis is based on RDD's. As stated in overview, RDD is stored in memory so it's good for interactively analysis or repetitive analysis like machine learning.
3. Spark Streaming is also a very high level API so you don't need to worry about low level details.

#### Bokeh:
Then we collect the analysis result and visualize the top 100 feature word with the highest informativeness interactively using Bokeh.

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from time import strftime, gmtime, sleep
import TweetsStreamingPlot as tsp
from math import log
import subprocess
import re

To receive tweets in Spark, we first run the tweets listener python file as a background sub-process. It will pull the tweets and send them to port 5555 at IPAddress '172.17.0.2' which is the address of this docker container.

In [2]:
_    = subprocess.Popen(['python', 'TweetsListener.py'])

# Global variables:

In [3]:
# In order to filter stop words in tweets, we prepared the vocabulary as a list.
with open("../data/stop_words.txt", 'r') as f: STOPWORDS = f.read()
    
STOPWORDS = STOPWORDS.split("\n")
STOPWORDS = [word for word in STOPWORDS if word != '']

In [4]:
data = {'color':[], 'size':[], 'x':[], 'y':[], 'text':[]}  # the data source for plotting
plot = tsp.StreamingPlot(width=900, height=900)            # the streaming plotting object
logs = open('batch_log.txt', 'w')                          # the file saving intermediate and final results

In [5]:
def timeHeader():
    """print time header"""
    return ("\n----------------------------") + \
           ("Time: " + strftime("%Y-%m-%d %H:%M:%S", gmtime())) + \
           ("----------------------------\n")

We set up the Spark context in local mode with 3 CPU's running simulating 3 different machines. And build a Spark streaming context based on Spark context and set the time interval to 5 seconds. So the incoming tweets will be collect into 1 RDD every 5 seconds.

In [6]:
conf = SparkConf().setMaster('local[3]')
sc   = SparkContext(conf=conf)
ssc  = StreamingContext(sc, 5)

# Tweets cleaning:

We receive tweets sending from tweets listener script through assigned port. Because the raw tweets are strings full of emojis and wired symbols. We preprocess them by spliting each line into words, removing useless symbols, tranforming all words into lower cases, removing stop words and assign one of the labels to each tweet.

In [7]:
def assign_label(words):
    """ Assign labels to tweets. If this tweet has word resist in it, 
    then we assign label resist to it. Else if it has word maga, we 
    label it as maga. If it doesn't have either words, we return none.
    In the same time we remove the label word from the words.
    
    Inputs: words: one tweet in form of a list of cleaned words.
    Output: labeled tweet: (label, words)
    """
    if 'resist' in words:
        words = [x for x in words if x != 'resist']
        return('resist', words)
    if 'maga' in words:
        words = [x for x in words if x != 'maga']
        return('maga', words)

In [8]:
def writeRDD(rdd):
    """
    """
    global logs
    logs.write(timeHeader())
    logs.write("\nRaw Tweets:\n{}".format(rdd.take(num=1)))
    logs.flush()
    
raw_tweets = ssc.socketTextStream('172.17.0.2',5555)
raw_tweets.foreachRDD(writeRDD)
# raw_tweets.pprint()

# for each rdd in stream:
# split the string by space
# remove char that is not a letter or number
# lower case all the words
# remove empty strings
# remove stop words
# remove empty list
# assign label to each tweets
# remove tweets that don't belong to any label
clean_tweets = raw_tweets\
    .map(lambda x: x.split())\
    .map(lambda x: [re.sub(r'([^\s\w]|_)+', '', y) for y in x])\
    .map(lambda x: [word.lower() for word in x])\
    .map(lambda x: [word for word in x if word != ''])\
    .map(lambda x: [word for word in x if word not in STOPWORDS])\
    .filter(lambda x: x != [])\
    .map(assign_label)\
    .filter(lambda x: x != None)    
# clean_tweets.pprint()

# Analysis:

After cleaning, the tweets are a stream of RDD's. Each RDD contains tweets for last 5 seconds, each tweet is stored in a row as a tuple (label, words). We further use window method in Spark Streaming to take all the tweets from last 60 seconds and perform analysis on them. 

![](../images/DStreams.png)

### Functions for analysis:
The informativeness of features defined by the biggest ratio of conditional probability:

$$maxarg\left( \frac{p(feature|label=resist)}{p(feature|label=maga)}, \frac{p(feature|label=maga)}{p(feature|label=resist)}\right)$$

Intuitively, it means how much information can this feature word tells you for classifying this tweet as maga or resist.

In [9]:
def count_by_label(rdd):
    """ count the number of tweets by label
    """
    global logs
    
    # count the number of maga tweets
    # count the number of resist tweets
    # flatten the words so that each row is (label, word)
    n_maga   = float(rdd.filter(lambda x: x[0] == 'maga').count())
    n_resist = float(rdd.filter(lambda x: x[0] == 'resist').count())
    new_rdd  = rdd.flatMapValues(lambda x: x)
    
    logs.write('\n# Resist Tweets: {}\t # MAGA Tweets: {}'\
        .format(int(n_resist), int(n_maga)))
    logs.flush()
    
    return new_rdd, n_resist, n_maga


def fetch_features(rdd):
    """ fetch top 5000 features for each label
    """
    global logs
    
    # input rdd has rows: (label, word)
    # use the label and word as key as add count 1: ((label, word), 1)
    # add up the count: ((label, word), count)
    # reorganize: (label, (word, count))
    new_rdd = rdd.map(lambda x: (x, 1))\
        .reduceByKey(lambda a, b: a+b)\
        .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    #    .sortBy(lambda x: x[1][1], ascending=False)
        
    # maga features are top 5000 words with highest count
    m_feats = new_rdd.filter(lambda x: x[0] == 'maga')\
        .takeOrdered(5000, key = lambda x: -x[1][1])
    # resist features are top 5000 words with highest count
    r_feats = new_rdd.filter(lambda x: x[0] == 'resist')\
        .takeOrdered(5000, key = lambda x: -x[1][1])
    
    logs.write("\nMAGA features:\n{}".format(m_feats[:10]))
    logs.write("\nresist features:\n{}".format(r_feats[:10]))
    logs.flush()
    
    return m_feats, r_feats


def collect_feature_counts(maga_feats, resist_feats):
    """ collect the words for each label as a list.
    collect the word:count for each label as a dictionary.
    """
    words, counts = {}, {}
    # create a dictionary: label:feature_words
    words['m'] = [x[1][0] for x in maga_feats]
    words['r'] = [x[1][0] for x in resist_feats]
    
    # create a dictionary: label:feature_word_counts
    # feature_word_counts is a dictionary of word:count pairs
    counts['m'] = {x[1][0]:x[1][1] for x in maga_feats}
    counts['r'] = {x[1][0]:x[1][1] for x in resist_feats}
    
    return words, counts


def combine_feature_counts(counts):
    """ create a combined dictionary with pairs {word:[count_maga, 
    count_resist]}, for word only appears in one dataset, we will 
    assign 1 as count to avoid zero divide"""
    global logs
    combined_counts = {}
    
    # if a word appears in only maga tweets, we add word:[count_maga, 1]
    # if a word appears in both tweets, we add word:[count_maga, count_resist]
    # if a word appears in only resist tweets, we add word:[1, count_resist]
    for word, count in counts['m'].items():
        if count <= 1: continue
        if word not in counts['r'].keys():
            combined_counts[word] = [count, 1]
        else:
            combined_counts[word] = [count, counts['r'][word]]
    for word, count in counts['r'].items():
        if count <= 1: continue
        if word not in combined_counts.keys():
            combined_counts[word] = [1, count]
            
    logs.write("\nCombined feature_counts_by_label: {}".format(list(combined_counts.items())[:10]))
    logs.flush()

    return combined_counts


def calculate_informativeness(cps):
    """ takes in the conditional probablities of one feature and 
    calculate the informativeness of this feature.
    
    inputs:
    -------
    cps[0]: the conditional probability of this feature word given label maga
    cps[1]: the conditional probability of this feature word given label resist
    """
    return round(max(cps[0]/cps[1], cps[1]/cps[0]), 5)

### Functions for preparing plotting data:

In [10]:
def min_max_scale(list_):
    """normalize a list to range [0, 1]"""
    max_, min_ = max(list_), min(list_)
    return [(x - min_)/(max_ - min_) for x in list_]


def plot_data(infs, words):
    """ transform the informativeness to plotting data source.
    We will use the labels as scatter circles' color, informativeness 
    as scatter circles' sizes, words as texts, cp_maga as x, cp_resist 
    as y.
    
    Inputs:
    -------
    - infs: a list of tuples, each tuple is in form: 
        (feature_word, cp_maga, cp_resist, informativness)
    - words: the list of feature words for each label.
    """
    global data, logs
    
    text, other = zip(*infs)
    x, y, size  = zip(*other)
    data['x']   = min_max_scale(x)
    data['y']   = min_max_scale(y)
    
    color = [1 - x_ + y_ for x_, y_ in zip(data['x'], data['y'])]
    color = min_max_scale(color)
    color = [tsp.cms['RdBu'][int(c/0.1)] for c in color]
    data['color'] = color
    
    size = min_max_scale(size)
    data['size']  = [x/20 for x in size]
    data['text']  = text
    
    logs.write('\nFeature\tCond_prop_1\tCond_prob_2\tInformativeness')
    for f, x_, y_, i in zip(text[:10], x[:10], y[:10], size[:10]):
        logs.write('\n{}\t{:.3f}\t{:.3f}\t{:.3f}'.format(f, x_, y_, i))
    logs.flush()

In [None]:
def analysis(rdd):
    """ Take in the RDD with cleaned tweets and perform analysis. Save the results 
    to logs text file as well as interactive plots.
    """
    # count the number of tweets
    new_rdd, n_resist, n_maga = count_by_label(rdd)
    maga_feats, resist_feats = fetch_features(new_rdd)
    words, counts = collect_feature_counts(maga_feats, resist_feats)
    combined_counts = combine_feature_counts(counts)

    # we parallelize the combined_counts onto the cluster, 
    # each row in form: (word, (count_maga, count_resist))
    # calculate the conditional probabilities for each label
    # take the log for conditional probabilities and calculate informativeness using conditional probs
    # sort the rows by informativeness and take the top 5000 rows.
    infs = sc.parallelize(combined_counts.items())\
        .mapValues(lambda x: (x[0]/n_maga, x[1]/n_resist))\
        .mapValues(lambda x: (log(x[0]), log(x[1]), calculate_informativeness(x)))\
        .sortBy(lambda x: x[1][2], ascending=False)\
        .take(5000)

    plot_data(infs[:100], words)


clean_tweets.window(60).foreachRDD(analysis)
ssc.start()

In [None]:
plot.start(data)