# Implementing Sequential Iteration in Naiad

In this post we will cover a programming pattern frequently thought to be incompatible with data-parallel systems: sequentially updating a collection of values each as a function of the most recent values. In C#, this would look something like:

// sequentially updates array of values for (int i = 0; i < values.Length; i++) values[i] = updateBasedOn(values);

This type of iterative process is at the heart of several graph algorithms, from efficient pageranking, to graph coloring, to general loopy belief propagation. Although the logic looks very sequential, we will see that it can be made very parallel in the context of graph processing.

Let’s start by defining the problem we are looking at. Imagine we have a graph on n nodes, with directed edges between some of the nodes. Each node has an update function, which reads the state from all of its incoming neighbors and updates its state. The desired output is the result of running the update functions sequentially, as above. The only difference from the code fragment above is the structure on the update function: it only gets to see (and depend on) values indicated by edges in the graph. Explicit access to this graph is what will allow us to parallelize the computation.

As a running example, consider coloring a graph: assigning labels to each vertex so that each edge connects differently colored vertices. We would use each node’s color as its state and the update function would update the state to be the first color not found at any of the node’s neighbors.

// finds the least value not in a supplied set of values int FindColor(IEnumerable<int> colors) { var hashset = new HashSet<int>(); foreach (var element in colors) hashset.Add(element); var myColor = 0; while (hashset.Contains(myColor)) myColor++; return myColor; }

Executing this update function for each node in sequence colors the graph.

Putting aside the issue of sequential execution for the moment, let’s see what happens if we try and write a data-parallel implementation of graph coloring. We use a standard pattern of joining the state with the set of edges, effectively addressing a copy of each node’s state to interested neighbors, who collect them and perform their update logic.

// takes a graph, returns a coloring of the graph (or diverges). Collection<IntPair, T> ColorStep<T>(Collection<IntPair, T> colors, Collection<IntPair, T> graph) where T : Lattice<T> { return graph.Join(colors, e => e.s, c => c.s, (e, c) => new IntPair(e.t, c.t)) .Concat(colors.Select(x => new IntPair(x.s, -1)) .GroupBy(c => c.s, c => c.t, (n, cs) => new IntPair(n, FindColor(cs))); }

The Concat operator makes sure the update logic fires even if a node has no incoming edges (otherwise, the computation would produce no output state for that node).

Unfortunately, this computation doesn’t necessarily produce the same result as the sequential execution. The problem is that each node updated its color using the initial values of the other nodes, rather than the most recent values. There is no reason to believe that the result is a valid coloring. We could run it repeatedly, though, hoping to make some progress:

// takes a graph, returns a coloring of the graph (or diverges). Collection<IntPair, T> Color<T>(Collection<IntPair, T> graph) where T : Lattice<T> { // create some initial bogus values for all nodes in the graph var start = graph.SelectMany(edge => new[] { edge.s, edge.t }) .Distinct() .Select(n => new IntPair(n, -1)); return start.FixedPoint(c => ColorStep(c, graph.ExtendTime())); }

This also doesn’t produce the same result, and it may not even terminate. Consider a simple graph with two nodes and one edge, where both nodes have the same color: each iteration the colors will both switch between the first two colors as both vertices make the same decision about which color to try next.

Fortunately, there is an easy fix! The problem is that each node considers the updated values of all of its neighbors, whereas in the sequential iteration it would only see the updated values of neighbors before it in the sequential order. We can make that happen, by re-writing the update step to only flow values forward in the graph, keeping a static set of stale state values for values from reverse edges:

// takes a graph, returns a coloring of the graph (or diverges). Collection<IntPair, T> ColorStep<T>(Collection<IntPair, T> colors, Collection<IntPair, T> stale, Collection<IntPair, T> graph) where T : Lattice<T> { // restrict edges to those pointing forwards return graph.Where(edge => edge.s < edge.t) .Join(colors, e => e.s, c => c.s, (e, c) => new IntPair(e.t, c.t)) .Concat(colors.Select(x => new IntPair(x.s, -1)) .Concat(stale) .GroupBy(c => c.s, c => c.t, (n, cs) => new IntPair(n, FindColor(cs))); }

Now we just need to re-rig the main method to determine the static set of stale messages, and pass them to the update step.

// takes a graph, returns a coloring of the graph (or diverges). Collection<IntPair, T> Color<T>(Collection<IntPair, T> graph) where T : Lattice<T> { // create some initial bogus values for all nodes in the graph var start = graph.SelectMany(edge => new[] { edge.s, edge.t }) .Distinct() .Select(n => new IntPair(n, -1)); // capture values flowing backwards in the graph var stale = graph.Where(edge => edge.s > edge.t) .Join(start, e => e.s, c => c.s, (e, c) => new IntPair(e.t, c.t)) return start.FixedPoint(c => ColorStep(c, forward.ExtendTime(), stale.ExtendTime())); }

This loop does always converge, and to exactly the results of the sequential iteration.

Finally, one is often interested not only in running one pass of sequential iteration, but multiple passes. This is just a matter of making the starting set of values a parameter, and wrapping another loop around the new method:

// implements one full round of sequential iteration. Collection<IntPair, T> ColorRound<T>(Collection<IntPair, T> start, Collection<IntPair, T> graph) where T : Lattice<T> { // capture values flowing backwards in the graph var stale = graph.Where(edge => edge.s > edge.t) .Join(start, e => e.s, c => c.s, (e, c) => new IntPair(e.t, c.t)) return start.FixedPoint(c => ColorStep(c, forward.ExtendTime(), stale.ExtendTime())); } // performs many rounds of sequential iteration. Collection<IntPair, T> Color<T>(Collection<IntPair, T> graph) where T : Lattice<T> { // create some initial bogus values for all nodes in the graph var start = graph.SelectMany(edge => new[] { edge.s, edge.t }) .Distinct() .Select(n => new IntPair(n, -1)); return start.FixedPoint(c => ColorRound(c, graph.ExtendTime())); }

In fact, coloring as we have done it only ever requires one round (subsequent rounds reach the same result), but other, more complicated problems do require multiple rounds. Coloring just happens to be simple enough to present easily.

So we have implemented multiple rounds of sequential iteration in a data-parallel manner, but does it actually perform well? Each round of sequential iteration should update each node once, and communicate values along each edge once. The implementation above may do more work. Although it may seem that the problem is that each iteration every node reconsiders its inputs, doing as much work as sequential iteration in each inner loop iteration, remember that the implementation is incremental. Work only happens when values change, and that is what we need to look at.

The actual problem is that nodes may execute their logic and communicate results before their inputs have stopped changing. meaning they get called more than once. We can fix this by adding another meta-value “I don’t know yet” which all nodes start each round with (an excellent candidate above would be the color “-1”). Each node waits until all of its inputs from neighbors earlier in the sequential order are set (non-negative), and change its output from “I don’t know yet” to a concrete value. This happens only once each round, communication along each edge only once each round.

The incremental implementation has other advantages as well: each round of outer iteration will only perform work for values that have changed, and can re-use the inner iterations of previous rounds, essentially skipping over boring elements in the sequential update. This is an optimization one can implement in sequential iteration by hand using dirty bits, setting the dirty bit of each changed node’s neighbors and only updating nodes with a dirty bit. Here it differential dataflow automatically results in the optimization.

Furthermore, as with everything else in Naiad, the whole computation is incrementalized to respond efficiently to changes in inputs. If we run the coloring computation on a random graph with 1M nodes and 5M edges, the initial execution followed by 10 random edge updates take times:

Time to process: 00:00:26.3718507 Time to process: 00:00:00.0096000 Time to process: 00:00:00.0022038 Time to process: 00:00:00.0062401 Time to process: 00:00:00.0012407 Time to process: 00:00:00.0020953 Time to process: 00:00:00.0014932 Time to process: 00:00:00.0014528 Time to process: 00:00:00.0018839 Time to process: 00:00:00.0069049 Time to process: 00:00:00.0010848 ...

While the initial run takes 26 seconds (which is longer than it should take; the time can be reduced with some more effort) each of the incremental updates (adding and removing one edge each) takes single-digit milliseconds.

One reaction we’ve gotten from folks who like the vertex-centric programming model is that all this code is pretty complicated to do something as simple as sequential iteration. However, the code above doesn’t need to be specific to graph coloring; it is just a few lines of code change to make the code generic with respect to the type of state (it can be “S” rather than “int”), and to take the function from an enumeration of states to an updated state as a parameter. You implement the above only once, and the user just needs to call in to a function with prototype:

// sequential iteration for generic data types and update function Collection<S, T> SeqIteration<S, T>(Collection<S, T> start, Collection<IntPair, T> graph, Func<IEnumerable<S>, S> update);

which is the same prototype you would use for the for-loop at the start of the post, except that you explicitly specify the graph. You get the parallelization and incremental updates without a single line of new code.

So in conclusion, graph-oriented sequential iteration can be implemented in standard data-parallel systems, and in an incremental data-parallel system like Naiad they can be pretty efficient. While there are still be good reasons to build systems specialized for this task, the belief that standard systems cannot do this work is not among them. Although, standard data-parallel systems may not perform as well as specialized systems, incremental and differential dataflow systems should be closing this gap, as in principle they do not need to perform any more work than the specialized systems.