Skip to content
Tags

Building new frameworks on Naiad

by on April 29, 2014

Since we first announced Naiad back in 2012, the system has grown in many ways, including performance, robustness, and support for other platforms. From a developer’s point of view, the biggest change came in early 2013, when we split the project into two parts, so that our differential dataflow implementation became a library on top of the more general distributed Naiad runtime. The main consequence of this split is that you can now build new frameworks and DSLs on top of Naiad, and take full advantage of its high-performance runtime. In future posts, we will talk more about some of the frameworks that we have released with Naiad. Today I’m going to show how easy it is to build a new framework, and the opportunities that this opens up.

Background

We originally created Naiad because we liked the programming experience of DryadLINQ for cluster computing, but saw that its system design give it suboptimal performance for many applications — particularly for iterative and incremental workloads. We therefore wrote a version of LINQ called “NaiadLINQ” and co-designed a simple parallel runtime that would execute these programs on clusters of multicore machines.

NaiadLINQ was a fairly unique language: it could describe fully-incremental nested-iterative data-parallel programs, and the accompanying system could execute these efficiently across a modestly-sized cluster, as we showed in our CIDR paper on differential dataflow. On the other hand, these fully incremental programs consume more memory and computation than their non-incremental counterparts, because differential dataflow uses a custom data structure, called a “collection trace”, to store different versions of the state at each operator. The resulting system had unprecedented performance for many incremental computations, but this is a specialized use case: many iterative jobs are non-incremental, many incremental jobs are non-iterative, and surprisingly many people just want to count words.

At this point, in late 2012, we reflected on the state of the project, and realized that the anonymous runtime underneath NaiadLINQ had accumulated many desirable properties. To make our NaiadLINQ programs run efficiently, we had to design (and prove the correctness of) a lightweight distributed protocol for progress tracking and coordination. We made it possible to schedule operators asynchronously across multiple processes, optimized the amount of data that those processes exchanged, and tuned the networking until those data could be exchanged with extremely low latency.

If only we could avoid the overhead of differential dataflow, we would have a system that could match — or even exceed — the performance of the specialized systems that were emerging around that time.

Enter timely dataflow

We split NaiadLINQ into two parts in early 2013. We named the distributed runtime “Naiad”, and its low-level interface “timely dataflow”. I blogged previously about timely dataflow, and we wrote it up in our SOSP 2013 paper. Naiad retains several concepts from NaiadLINQ: it has a stateful, streaming execution model with loops; logical timestamps enable applications to differentiate between different versions of data; and processes can coordinate at a fine (millisecond-scale) granularity.

Timely dataflow is lower level than other recent programming models for cluster computing. A computation is built from individual vertices, which can have arbitrary state, and implement two types of callback:

  • The OnRecv callback is invoked each time a message is received on one of the inputs to the vertex. This callback can read and write the local state, send messages on any of the vertex outputs, and optionally request a notification when all of the messages at a given time have been received.
  • The OnNotify callback is invoked when all of the messages bearing a particular timestamp have been received. This callback can also read and write the local state, send messages on any of the vertex outputs, and request future notifications.

Message delivery is asynchronous: it requires no coordination, and enables work done and messages sent in OnRecv to be asynchronous as well. Notifications are synchronous, and use the progress tracking protocol to ensure that they are delivered promptly. This typically leads to OnRecv performing embarrassingly parallel work (like the Map and Combine phases in MapReduce), and OnNotify performing work related to aggregation (like the Reduce phase).

If you’re a data scientist, such a low-level API might seem unappealing next to something more easily understood, like relations in SQL, incremental collections in NaiadLINQ, RDDs in Spark, or update functions in GraphLab. We completely agree! That’s why we explicitly designed Naiad to support higher-level frameworks — like differential dataflow — as libraries of parameterized vertices. Moreover by giving developers so much control over what executes, Naiad makes it possible to implement custom data structures and algorithms in their vertices, and thereby outperform general-purpose approaches.

Higher-level frameworks

What do I mean by “higher-level framework”? A Naiad framework is a library that:

  • Provides extension methods or a wrapper type for connecting Naiad streams to and from parallel stages of vertices.
  • Contains vertex implementations, or assembles functionality from other frameworks.
  • Lives outside the core Naiad runtime, and uses only public APIs.

NaiadLINQ’s implementation of differential dataflow became the first high-level framework when we split the codebase. Subsequently, we implemented a non-incremental LINQ-with-iteration framework, called Lindi, which has simpler and more efficient implementations for many of the stateful operators (such as Aggregate). We recently added a framework called GraphLINQ, which goes even further and optimizes several aggregation and data exchange operators. Future blog posts will discuss these frameworks in more detail.

Naiad is not the first system to support higher-level frameworks. The most prominent is (Hadoop) MapReduce, which supports a variety of SQL-like frameworks (Pig, Hive, etc.). More recently, Spark has begotten Shark, Spark Streaming, and GraphX. Each of these new frameworks required substantial engineering effort, documented in several major research papers. However, we contend that Naiad (or timely dataflow) is a better foundation for these frameworks: for example, Naiad makes it possible to implement all of the graph-specific optimizations in PowerGraph (the best-in-class graph processing system from OSDI 2012), while the performance of GraphX is limited by fixed costs in RDD execution.

We also intended the Naiad API to make it easier for developers to create new frameworks. To illustrate the steps and the level of complexity involved, I’ll round out this post with a code example for a simple framework.

Writing a new framework

Our example framework will have a single operator, Threshold, that produces a stream of all the records appearing at least n times in the input stream. I’ll use pseudocode to keep some things short, but the whole example code is available here.

The first step is to define the method that application programmers will use to interact with the operator. In this case, we’ll use an extension method on Stream<R, T> objects, enabling our new operator to be used in any Naiad computation:

        public static Stream<R, T> 
        Threshold<R, T>(this Stream<R, T> input, int n)
            where T : Time<T>
        {
            // Implementation goes here.
        }

By writing the extension method this way, all Naiad streams gain a method called
Threshold(int)
:

Stream<string, Epoch> strings = ...;

Stream<string, Epoch> fiveOrMore = strings.Threshold(5);

Now we turn to the implementation of the vertex. There are several ways to write this vertex. If we were using MapReduce or Spark, the natural implementation would use  reducers that count instances of each record, and emit the record in cases where the count is greater than the threshold. However, this would block the overall computation until all input records have been shuffled to the last reducer, which is a major source of stragglers in MapReduce. In Naiad we can do better, by emitting records as soon as they exceed the threshold, and thereby allowing downstream work to get started immediately.

The logic of our thresholding operator is implemented in a class that extends Naiad’s Vertex class:

        class ThresholdVertex<R, T> : UnaryVertex<R, R, T>
            where T : Time<T>
        {
            private readonly int threshold;

            // Per-time counts for each record.
            private readonly Dictionary<T, Dictionary<R,int>> counts;

            public override void OnReceive(Message<R, T> message)
            {
                var time = message.time;

                // Get the state for this time.
                // ...
                // Request a notification when we are finished with 
                // this time.
                this.NotifyAt(time);

                // For each record in the message.
                for (int i = 0; i < message.length; ++i)
                {
                    var record = message.payload[i];

                    // Read-modify-update current count for record.
                    // ...

                    // Send record only when it crosses threshold.
                    // ...
                }
            }

            public override void OnNotify(T time)
            {
                // Release the state associated with the given time.
                // ...
            }

            public ThresholdVertex(int index,
                                   Stage<T> stage,
                                   int threshold)
                : base(index, stage)
            {
                this.threshold = threshold;
                this.counts = new Dictionary<T, Dictionary<R,int>>();
            }
        }

Since the operator has a single input and a single output, we can extend the UnaryVertex class, which provides an OnReceive method to override and an Output property for sending records. The bulk of the work happens in OnReceive, which updates the state for each received record, and sends the record when it first exceeds the threshold. Notifications are used to garbage collect the state associated with each time, after that time has completed.

The final step is to fill out the implementation of the extension method we wrote above:

        public static Stream<R, T>
        Threshold<R, T>(this Stream<R, T> input, int n)
            where T : Time<T>
        {
            return input.NewUnaryStage(
                (i, s) => new ThresholdVertex<R, T>(i, s, n),
                x => x.GetHashCode(), x => x.GetHashCode(),
                "Threshold");
        }

The extension method defines a new Stage, which is a parallel collection of dataflow vertices, analogous to the “map stage” or “reduce stage” of a MapReduce job. Since we have built a unary vertex, the NewUnaryStage convenience method is available. To create a stage, we must tell Naiad how to construct an individual vertex (using a factory function), and how to partition records between the vertices (using the hashcode). Additionally in this case, we can tell Naiad how the output records will be partitioned (again by hashcode, since the operator doesn’t modify the records), which enables downstream vertices to optimize data exchange in some cases.

The complete code for the operator is just 76 lines of C#, and you can see it all here.

Going further

Of course I’ve shown you one of the simplest possible vertex implementations (except perhaps for the Lindi operators), but there are several ways in which it could be extended. The current implementation stores every record explicitly, but if the downstream computation could handle some false positives, then the vertex could implement streaming algorithms like the count-min sketch and HyperLogLog. Naiad vertices can have multiple outputs, so it would be possible to have a second output that reports the final counts for each record, or other summary statistics. We have even implemented vertices that interoperate with code in another language, so that Naiad can use optimized numerical routines.

By building Naiad with this flexibility, we hope to enable parallel innovation. Above the timely dataflow API, it is easy for developers to experiment with new frameworks, algorithms, and languages for parallel computing. Meanwhile below, we will continue to make advances in the systems technology that makes all Naiad programs more efficient and robust. That’s why we made the code open source: we’re excited to see what you’ll build.

Derek Murray (@mrry) is a member of the Naiad team at Microsoft Research Silicon Valley.

Advertisements

From → Naiad

Leave a Comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: