Sentiment Analysis with Apache Spark

Posted on 19 November 2015

I gave a short presentation (an 'ignite' session actually; 20 slides in 5 minutes) during JFall in which I gave a short introduction on sentiment analysis. This short talk was based on a CFP submission on which the focus was more on doing something fun with Spark and a bit less on the sentiment analysis itself. In this post I am going to go into a bit more detail than what was possible in a 5 minute presentation.

The code and the presentation can be found at this repository.

Introduction

The goal of the paper was to do a demonstration on how a big chunk of data could be analyzed using Apache Spark. There is a lot of data you can use and I picked a large reddit dataset (more on this later) to do the analysis. I also wanted to do something a bit more fun than the typical post counts so I decided to incorporate a simple form of Sentiment Analysis that would give me an impression of how positive and/or negative certain sub reddits (subs) were.

The data

In June 2015 someone posted a set of torrents containing huge chunks of downloaded data. Eventually the entire set of all publicly available posts between 2007 and June 2015 was made available through a torrent: it is over 1TB in size (compressed JSON) and contains over 1.7 billion comments. Each of these comments is a single JSON object and they’re separated by newlines.

I downloaded all the data but since it doesn’t even fit on my poor little laptop (not enough hard disk space) I decided to run the analysis on a small subset of the data: all the submissions of January 2015. This subset is still 5GB worth of compressed (13GB uncompressed) JSON.

The JSON objects themselves contain a lot of information such as votes, ids and perm links that aren’t used in the analysis so these were thrown out. The only parts used were the sub reddit, timestamp, author and the comment text.

Sentiment Analysis

Sentiment Analysis is basically trying to find out if a piece of text is positive, negative or neutral. To be able to analyze a piece of text you first need to create (or in my case: download) a list of words (or even combinations of words) with scores attached to them.

Creating such a list from scratch is a lot of hard work. You either have to manually annotate almost every word in the language of choice with scores, or use machine learning to create annotated text. Since this was quite a bit beyond the scope of this analysis I decided to use the freely available SentiWordNet list. It has positive and negative scores for a lot of words in the english language; around 40k words in total.

There are a few downsides to this approach though. First of all the word list isn’t very clear on many words and/or has some dubious choices for many of the words. For example the word 'humble' has a negative score while 'humbly' has a positive score. The same goes for humiliated (negative) versus humiliation (positive).

And secondly it does not have any context for words. 'Not' in front of a word can simply invert the sentiment. The word 'grace' is strongly positive but the words 'without grace' should in most cases score negative while 'not without grace' is more neutral. This word list coupled with my approach does not understand this form of context

If you want to score individual pieces of text, like customer reviews, this is a big issue. But since we’re just scoring averages on rather large volumes of relatively short texts it should still give us interesting results.

Apache Spark

Spark is quickly becoming the standard framework for doing Big Data analysis. For any Java developer, especially the ones experienced with Reactive programming and/or the new Java 8 streams API, it’s really easy to get started with. A quick example that shows how easy it is to do a word count (the 'Hello World' of Spark) can be found here. And even though it takes a relatively small file here it would work exactly the same on larger files; it lets you read a directory filled with compressed files just as easily.

Where Java 8 provides streams, Spark provides RDD’s. An RDD, a resilient distributed dataset, is essentially just an immutable stream of data. Any operation you do on it results in a new RDD which again is immutable. These RDD’s can also be cached and reused: you can use an RDD as input from different kinds of operations. How you cache an RDD is up to you; you can use memory, disk or both.

So what happens under the hood? Well, until you in some way 'collect' the data from an RDD nothing happens. So in the example I provided Spark only starts the analysis when I do the final .collect() on the RDD. Spark then handles the distribution of the workload between the connected nodes.

The Code

Sentiment Analysis

The Sentiment Analysis code is very simple. The SentiWordNet wordlist is loaded into the WordList class, which is basically a Map from words to a score. Each comment is scored by looking up each word:

double score = 0.0;

for(String word : comment.getBody()) {
    WordList.Score scoreObj = list.get(word);

    if(scoreObj != null) {
        score += scoreObj.getPositive();
        score -= scoreObj.getNegative();
    }
}

return AnalysedComment.of(comment, score);

A comment is 'positive' if it has a normalized score (score / number of words) of 0.1 or higher and negative if the score is -0.1 or lower. Between -0.1 and 0.1 a comment is 'neutral'.

An example:

For an input sentence “Such an abhorrent sense of betrayal” with 6 words we score the individual words: “Such (-0.125) an (-0.125) abhorrent (-0.75) sense (-0.125) of betrayal (- 0.25)”

The sum of the scores is: -1.375

The normalized score is: Score / Words = -1.375 / 6 = ~ −0,23

This is how every comment gets assigned a normalized score which is then used to filter on positive, negative and neutral scores.

Spark

The Spark functionality is all contained in the SparkFacade class. To be able to use a Spark instance we simply create a JavaSparkContext like so:

SparkConf config = new SparkConf().setAppName("HelloSparkWorld").setMaster("local[1]");
//Create a context
JavaSparkContext sc = new JavaSparkContext(config);

The appName is just a name that allows you to distinguish your application from others when running on a cluster. Since we’re running locally with a single thread you can pick anything here. If you have an available spark cluster you can connect to it using setMaster.

The analysis is done in two stages for convenience. The first stage reads the input file, scores the comments and then serializes these objects to a temporary directory:

private JavaRDD<Comment> asCommentStream(String file, boolean filterDeleted) {
    return sc.textFile(file)
            .map(Mappers::toComment)
            .filter(c -> !filterDeleted || !c.isDeleted());
}

public void toObjectFile(String inFile, String outFile, Analyser analyser) {
    asCommentStream(inFile, true)
        .map(analyser::analyse)
        .saveAsObjectFile(outFile);
}

The reason is that the deserialization of the JSON is rather inefficient and takes a long time, a lot longer than the reduce steps. So if you want to add more reduce steps (to create more reports) you can add them without having to redo the costly JSON deserialization.

The reduce steps look like this:

public void sentimentAnalysis(JavaRDD<AnalysedComment> comments, File output) {
    List<Tuple2<String, Integer>> results;

    results = comments
            .filter(AnalysedComment::isPositive)
            .mapToPair(c -> new Tuple2<>(c.getSubReddit(), 1))
            .reduceByKey((a, b) -> a + b)
            .collect();

    writeTuples(results, new File(output, "subPositive.csv"));

    results = comments
            .filter(AnalysedComment::isPositive)
            .mapToPair(c -> new Tuple2<>(toDayOfWeek(c.getDateTime()), 1))
            .reduceByKey((a, b) -> a + b)
            .collect();

    writeTuples(results, new File(output, "dayPositive.csv"));

    results = comments
            .mapToPair(c -> new Tuple2<>(c.getAuthor(), 1))
            .reduceByKey((a, b) -> a + b)
            .collect();

    writeTuples(results, new File(output, "authorTotal.csv"));
}

I have omitted a few since they basically all work the same. The 'comment' RDD has analyzed comments that contain the author, timestamp, sub and sentiment scores. For each report I filter the comments I want to count, map the comment to a (key, value) tuple and then reduce by key ending up with the counts I want. These then get collected into a list and written to a CSV.

To explain the individual steps, let’s imagine we already have a few (10) analyzed comments and want to count the positive comments per subreddit, as shown in this piece of code:

    results = comments
            .filter(AnalysedComment::isPositive)
            .mapToPair(c -> new Tuple2<>(c.getSubReddit(), 1))
            .reduceByKey((a, b) -> a + b)
            .collect();

So our input RDD looks like this:

Table 1. Analysed Comments
subreddit score

aww

0.25

aww

0.15

aww

0.05

funny

0.50

funny

0.45

funny

0.25

funny

0.90

politics

-0.15

politics

0.25

politics

-0.45

We filter the comments with a score >= 0.1:

Table 2. Filtered Comments
subreddit score

aww

0.25

aww

0.15

funny

0.50

funny

0.45

funny

0.25

funny

0.90

politics

0.25

We map them to (subreddit, count) tuples using mapToPair:

Table 3. Mapped Pairs
subreddit count

aww

1

aww

1

funny

1

funny

1

funny

1

funny

1

politics

1

And then reduce them by key with a summation function that gives us the counts:

Table 4. Reduced Pairs
subreddit count

aww

2

funny

4

politics

1

This table is then collected into a List and written to CSV.

So why the separate steps? Why not just iterate over the data and just count? The big change in the map-reduce programming model is that by separating these steps it is much easier to execute the computations in parallel. All the map functions are completely independent of each other and only the final reduce results need to be collected into a single instance to be able to get a single list of results.

In a single threaded model the map-reduce model gives no benefits. But the code can be executed on a thousand node Spark cluster without changes in the code.

Lessons learned

Spark is fun, easy to use and easy to set up, but there are a number of issues / pitfalls that you tend to run into when you’re starting out.

Serializable

Most important: the data classes and anything they contain need to be serializable. This makes perfect sense if you think about it; nodes exchange this data between each other. Fortunately Spark gives pretty clear warnings if something isn’t serializable.

It is also important to understand that if you write an RDD to an object file it’s in fact writing the serialized objects. If you later want to read them again you need to make sure that the serialVersionUID matches: so it’s best to set it yourself. Keep in mind that if you set it and then change the class in a way that breaks the serialization you also have to change the UID yourself.

BZ2 reading

If you’re reading a BZ2 compressed file multithreaded and you end up with index out of bounds exceptions there’s a good chance you’re using a version with a bug in the BZ2 reader that rendered it not thread-safe. This cost me a good amount of time.

Reusing an RDD

If you do multiple reduce operations on an RDD you really want to make sure it’s cached. If you don’t, the whole RDD gets reconstructed all the way from the source. Fortunately it’s quite easy to cache an RDD in memory, just call .cache() on an RDD.

Conclusion

I hope you enjoyed this small introduction into doing sentiment Analysis using Spark! Let me know if you have any questions (via the contact page or through github).