We’ve walked through a few example programs in previous posts, computing the connected components of a symmetric graph and the strongly connected components of a directed graph, but we didn’t say very much about how a computer (or computers) might execute these programs. In this post we’ll do that at a high level by introducing differential dataflow, the main feature distinguishing Naiad’s computational model from previous approaches to data-parallel computation. It is what lets Naiad perform iterative computation efficiently, and update these computations in millisecond timescales.

First let’s start with an example of how a traditional data-parallel system like DryadLINQ (or Hadoop, etc) might execute our connected components computation. Recall that we repeatedly supply the program with a new input graph for each epoch, and the program is expected to apply the loop body (Join, Concat, and Min operations) until the collection of labels stops changing. Let’s use $L(x,y)$ to represent the set of labels for the x’th input after y iterations. We are mostly interested in $L(x,\infty)$ for each value of x, but we’ll probably need each $L(x,y)$ along the way.

A traditional data-parallel system could start epoch epoch x from $L(x,0)$ and explicitly derive each $L(x,y+1)$ using only $L(x,y)$. For each input graph, for as many steps as are required, we could perform the Join, Concat, and Min computations on the most recent data. Despite the likely connections between the collections at different iterations, each operator is applied to each input independently, as suggested in the following picture.

Three applications of an operator using standard dataflow techniques. Each invocation independently applies to unrelated inputs, producing unrelated outputs.

This approach should do a fine job of computing the correct answer, but it can take quite a while to do so. Each epoch, each iteration, the system re-performs the data-parallel operations on large amounts of data essentially from scratch, despite strong similarities between the computations epoch-by-epoch and iteration-by-iteration.

Instead, consider an incremental dataflow system, implemented so that its operators communicate and respond to *differences* between collections. If we think of a collection as a multiset, meaning a map from records to integer counts, we can think of a difference between two collections as the map from records to the integer difference in counts. Even if two collections are very large, their differences can be quite succinct. Rather than present a second input graph, for example, the user can just specify the list of edges that have been added or removed. Many data-parallel operators have simple and efficient incremental implementations: for each record with non-zero difference in the input, one only needs to recompute the subcomputation that record is destined for. For example, if the Min operator receives a difference adding the record (key, value), it only needs to update the subcomputation associated with key; other output are unaffected as their inputs have not changed. Implemented properly, input differences changes result in output differences, which become input differences for the next operator, and an entire dataflow becomes incrementally updateable.

Three applications an operator using incremental dataflow techniques. The inputs and outputs are represented as differences (additions and subtractions) from the preceding collections.

The same mechanism can also be used for accelerating fixed-point loops. Each iteration produces a result which is meant to be re-supplied as input to the loop body, until it stops changing. Instead, the loop can just return with the differences, and an incrementally updateable loop body can push the differences through. This results in more differences, and the incremental updating continues until there are no more changes, meaning that the computation has reached fixed point.

The problem with incremental dataflow is that it really is either-or with respect to updates or loops. If you try to use incremental dataflow to process small updates to an iterative computation, you run in to the problem that it isn’t clear what collections you should take differences between. If you always use the most current collection, like incremental dataflow does, you end up having to roll back the iterative work previously done to return to the right correct point. If you do that, you have to re-do all the work, much of which may have stayed the same. Of course, there are lots of exceptions and patches and fixes you can make to this over-simplification (the research literature is full of them); differential dataflow is going to be a relatively simple way to side-step a lot of them.

The intuitive explanation behind differential dataflow is that as the collection of labels evolves, it does so for different reasons. The change from $L(1,1)$ to $L(1,2)$ reflects the change resulting from a single step of the loop body. The change from $L(1,1)$ to $L(2,1)$ reflects the introduction of some additional input supplied exogenously. Why should we have to choose between $L(1,2)$ and $L(2,1)$ when determining what comes “just before” $L(2,2)$? All other things being equal, shouldn’t both of these changes be useful in determining $L(2,2)$?

Rather than arrange the collections $L(x,y)$ in a total order and take differences, as required by incremental dataflow, we allow a partial order (in which two elements x and y may have neither $x \le y$ nor $y \le x$). For example, in the case of $L(x,y)$ above, we will would say that $(i,j) \le (x,y)$ iff $i \le x$ and $j \le y$. This puts the $L(x,y)$ in a grid, rather than a line, where $L(i,j)$ “comes before” $L(x,y)$ whenever $(i,j)$ lies in the rectangle defined by $(x,y)$.

One only has to think for a moment before one’s brain starts to hurt. If there isn’t a total order on the collections, what are the differences *between*? I hope all of you remember your multi-variate calculus. What we are going to do is the equivalent of going from derivatives (where change in f(x) is measured with respect to change in x) to second-order derivatives (where change in f(x,y) with respect to both x and y) and beyond. We will define and record $\delta L(x,y)$, sort of like a derivative, so that

$L(x,y) = \sum_{(i,j) \le (x,y)} \delta L(i,j)$

If you squint just right, that expression looks a lot like integration over the region defined by (x,y). If you take the equation above and “solve” for any specific $\delta L(x,y)$, you get

$\delta L(x,y) = L(x,y) - \sum_{(i,j) < (x,y)} \delta L(i,j)$

For each (x,y) pair, the difference recorded by differential dataflow is between the collection $L(x,y)$ and the sum of all prior differences that would participate in the sum. It is exactly the difference required to bring the sum equal to $L(x,y)$. If the differences are already doing a good job in representing the collection there is little to change, and not much computation to perform or additional data to store. In picture form, it looks a bit like this:

If the collection $A(1,1)$ only requires a small $\delta A(1,1)$, then there isn’t much work to do in determining output changes $\delta B(x,y)$. One of the main advantages of differential dataflow is that the sizes of the differences $\delta A(x,y)$ can be very small, especially as compared to incremental dataflow, resulting in substantially less work to perform. We’ll talk more about how to compute differences efficiently in upcoming posts, but for the moment notice that simply invoking data-parallelism gets us a long way. If we treat our data-parallel computation as a large collection of independent differential-dataflow-ed subcomputations (with inputs and outputs represented differentially), we can avoid doing any work for subcomputations whose inputs differences are empty.

Let’s look at the sizes of these differences in our connected components computation. Just below we’ve plotted the number of label changes (counting both additions and subtractions) for a connected components computation on a random graph with 1M nodes and 1M edges, as a function of the iteration. We’ve also plotted the number of changes needed when we randomly change 1000, 100, 10, and 1 edges.

Number of changes to the collection of labels for a 1M node and 1M edge random graph, as a function of the iteration. Also, the number of differences required by differential dataflow to update the computation when several numbers of edges are changed.

The initial run behaves as we might expect: lots of churn, until eventually nodes start settling down on their labels and then an exponential decrease in the number of label changes, as in incremental dataflow. The exciting new feature shows up when we update the input data. For a small change to the input, the change to each of the iterations is also small. Differential dataflow has given us the ability to do substantially less work, assuming we can efficiently determine what these output changes are. By comparison, naive incremental dataflow techniques must essentially restart the computation with each update to the input, no matter how few updates (unless there are zero, of course). Each of the lines would look like the initial run, many orders of magnitude larger than it needs to be under differential dataflow.

When we look at elapsed times for the same computations, we see that they roughly track the amount of work that is required. Many of the iterations with few updates required take less than a millisecond. You might also notice that the absolute times for the initial run and even the 1,000 change run are not great; we’ll talk more about optimizing the algorithm in a future post on “prioritization” in Naiad, which brings the total running time of the initial run down to just a few seconds.

Reported milliseconds between consecutive iterations. Many iterations associated with small numbers of differences take less than a millisecond. The single change update takes only 7 milliseconds for the entire execution.

This has been a light sketch of how differential dataflow works. Representing collections as differences, and keeping the differences separate, we are able to add up appropriate subsets of differences corresponding to various points in the computation. This additional flexibility come at the cost of additional memory overhead, as we need to keep all of the differences in a disaggregated form, but can result in a substantial reduction in the amount of computation required. The millisecond timescales from the previous posts are possible only because of this alternate representation. We’ll likely get in to more detail, but in the meantime feel free to read our CIDR paper on the subject.