Skip to content

Connecting a Naiad program to Twitter

by on October 24, 2012

One of the potentially game-changing advantages of incremental computation is that it allows you to react to incoming data when it is freshest, and most valuable. In particular, social networks are producing data at ever-increasing rates, and – because much of this data is conversational – we need the expressiveness of a system like Naiad to perform sophisticated analyses on it. In this post, I’ll show you how to connect a simple Naiad program to a stream of tweets from Twitter, and discuss some of the ways that Naiad can be used to analyze this type of data.

Preliminaries

In this post, I will assume that you have downloaded and built the Naiad source release, and that you have a Twitter account. We will be using the downsampled status stream from the Twitter streaming API, which allows registered Twitter users to observe a fraction of public tweets as they are posted. To test your access to this feed, attempt to open the URL https://stream.twitter.com/1.1/statuses/sample.json in your favorite browser or HTTP client, and be prepared to enter your Twitter username and password when prompted.

A simple Naiad program

As an example, we will write a Naiad program that accumulates running counts of each hashtag posted to Twitter while the program is running. The following Naiad program will perform the necessary computation, and print counts to the console as they are updated:

static void Main(string[] args)
{
    var config = Configuration.FromArgs(ref args);
    using (var controller = new Controller(config))
    {
        var tweets = controller.NewInput<Tweet>();

        tweets.Where(x => x.text != null)
              .SelectMany(x => x.text.Split())
              .Where(x => x.StartsWith("#"))
              .Select(x => x.ToLower())
              .Count(x => x, (x, c) => new Pair<string, int>(x, c))
              .Where(x => x.t > 1)
              .Subscribe(xs => { foreach (var x in xs) Console.WriteLine(x); });

        // Tweet loading code will go here.
    }
}

The above code creates a new Naiad input with records of type Tweet (which we’ll discuss more in a moment) (line 6), filters out those tweets with null text (line 8), splits the text on whitespace (line 9), filters out non-hashtags (line 10), normalizes the hashtags to lower case (line 11), groups by hashtag and takes a count (line 12), filters out hashtags that appear only once (line 13), and finally prints the hashtags and counts when they change (line 14).

The input data type is a Tweet, which is a user-defined type that models the data we receive from Twitter. Since we are only interested in the text of the tweet, we can define a simple type as follows:

using System.Runtime.Serialization;
// ...
[DataContract]
public struct Tweet : IEquatable<Tweet>
{
    [DataMember]
    public string text;

    public bool Equals(Tweet that)
    {
        return this.text == that.text;
    }
}

The sole requirement that Naiad places on user-defined types is that they must implement IEquatable<T>. (The current version further requires the types to be struct (value) types, and it is often preferable (in performance terms) to define simple types as structs, because this can create less work for the garbage collector. We may relax this in a future release.) The unusual thing about our Tweet type is that the [DataContract] attribute is applied to the struct, and the [DataMember] attribute is applied to its text field. These annotations will be used to generate deserialization code for the Twitter feed, and they represent one of the simplest possible uses of data contracts in .NET. For more information about this feature, read the comprehensive tutorial on MSDN. When deserializing a tweet, the Data Contract code will attempt to populate each [DataMember]-annotated field in the struct – leaving absent fields set to null – and you can add more [DataMember]-annotated fields to capture other metadata fields. (N.B. You’ll need to add a reference to the System.Runtime.Serialization assembly in your project, and import the System.Runtime.Serialization and System.Runtime.Serialization.Json namespaces in your source file.)

Processing a Twitter stream

Now that we have defined a Naiad computation for counting hashtags, we must write the code that supplies it with data.

string username = "...";
string password = "...";
NetworkCredential twitterCredentials = new NetworkCredential(username, password);
using (WebClient client = new WebClient())
{
    client.Credentials = twitterCredentials;
    using (var reader = new StreamReader(client.OpenRead("https://stream.twitter.com/1.1/statuses/sample.json")))
    {

For the sake of brevity, we’ll use HTTP basic authentication to connect to Twitter. Note that this does not conform to the best practices for using the Twitter API, and a more robust and future-proof client would use OAuth. Furthermore, it is unwise to embed your password in the source code, and a preferable approach would be to show a dialog box that reads your password from a masked text box.

        DataContractJsonSerializer dcjs = new DataContractJsonSerializer(typeof(Tweet));
        while (true)
        {
            string line = reader.ReadLine();
            using (var stringStream = new MemoryStream(Encoding.UTF8.GetBytes(line)))
            {
                Tweet t = (Tweet)dcjs.ReadObject(stringStream);
                tweets.OnNext(t);
            }
        }
    }
}

The format of the HTTP response is an unbounded stream of newline-delimited JSON objects, in each of which the key text maps to a string containing the body of a tweet. (The objects contain several other metadata field beyond this, and you can capture them by adding appropriately annotated fields to the Tweet struct definition.) To parse these objects into a Tweet object, we first create a DataContractJsonSerializer (line 24), which processes the attributes on the type definition to generate deserialization code. We then loop forever, reading a line from the stream (line 27), converting it to a byte array for parsing (line 28) and deserializing it into a Tweet (line 30). In this simple program, Naiad ingests a single tweet at a time, so we simply call tweets.OnNext() with the single parsed tweet. Note that, if parsing were more expensive, we could parallelize it by moving this into the Naiad computation in a Select operator, and feeding the Naiad computation with strings instead. For more complex algorithms, it would be advantageous to batch several tweets in a single epoch, which you could do by adding them to a list and calling tweets.OnNext() with the list instead.

If you run this code, you should eventually see output that looks something like this:

[ [#ff 2], 1 ]
[ [#nowplaying 2], 1 ]
[ [#ff 2], -1 ]
[ [#ff 3], 1 ]
[ [#teamfollowback 2], 1 ]

(I was running this on a Friday, so the #ff tag appears because it happens to be “Follow Friday“.)

Exercises for the reader

Counting hashtags is one of the simplest applications of Naiad to real-time Twitter streams. With a little more effort you can perform some more exciting analyses:

  • Use the tweets as input to a real-time search index.
  • Perform sentiment analysis on the Tweets as they arrive, and track the public mood on different topics.
  • Extract usernames from the tweets and convert the stream into a graph. Frank has already posted details of how to use Naiad to analyze the connectivity of a graph, and even extract the strongly connected components. Naiad allows you to track how the graph structure changes in real-time, by updating the results of these algorithms incrementally.
  • Compute the same algorithms over a sliding window of the stream, so that only tweets from the last hour or day are considered. As well as adding records, Naiad enables you to remove records from the input. In a future post, we’ll cover the different ways of computing sliding windows with Naiad, but for now you can experiment with the overloads for tweets.OnNext() that accept a (possibly negative) weight parameter.

From → Naiad

Leave a Comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

Join 55 other followers

%d bloggers like this: