Regular readers of this blog might notice that the Naiad team is rather fond of defining new kinds of dataflow. Just yesterday, we did it again with a paper that defines “timely dataflow”. In this post, I’m going to introduce timely dataflow at a high level, and follow-up posts will talk more about the distributed Naiad system that we have built on top of it.

Why dataflow?

Dataflow is a popular abstraction for parallel programming, because it is composable. Instead of having to reason about the global state of a system, a dataflow programmer can concentrate on writing relatively simple vertices that maintain local state and only communicate with other parts of the system through well-defined edges. You can think of MapReduce as a static form of dataflow (with two kinds of user-defined vertex, the mapper and the reducer), where higher-level languages like HiveQL enable programmers to build up more complicated query graphs. Newer systems like DryadLINQ and Spark use modern language features for functional programming to build dataflow graphs that run on a cluster. All of these systems (and many more) use data- and task-parallelism to scale out across large compute clusters, as I discussed in a previous post.

Why timely dataflow?

We started this project with a seemingly simple requirement: we sought a distributed execution layer for Naiad’s differential dataflow programs, which require support for iteration, and much lower latency than existing iterative dataflow systems provide. We identified a set of three features that would enable us to run differential dataflow programs:

• Stateful vertices that can consume and produce messages asynchronously. Unlike many scale-out dataflow systems, which use stateless vertices, we require that vertices can make low-latency random reads and writes to mutable state. Without state, incremental processing is inefficient, because a stateless vertex must transmit its entire state to the next vertex that might use it; with state, algorithms that make sparse updates (such as many graph algorithms) can be much more efficient.
• Structured loops that allow feedback in the dataflow. Loops make it possible to implement iteration, and easier to re-use state from one iteration to the next. Iterative dataflow systems like CIEL and Spark implement iteration by unrolling loops into acyclic graphs, which makes it much harder for subsequent iterations to share state (and also creates a garbage collection problem as the graphs become large).
• Explicit notifications when all records for a given round of input or loop iteration have been received. Asynchronous processing is great for some algorithms, but others require synchronization, including such simple algorithms as counting all of the records in a system at a given point in time. Notifications allow the system to make progress when the stream of incoming records is potentially unbounded, and are crucial for producing consistent outputs, particularly for sub-computations so that they may be composed in a dataflow graph.

No existing system provides all of these features or otherwise satisfies our requirements for a high-throughput, low-latency, incremental, and iterative system. This led to us creating a new model: timely dataflow.

What is timely dataflow?

Timely dataflow adds timestamps to the messages that flow between vertices in a dataflow graph. While simple integer timestamps are an old idea, timely dataflow timestamps have a more structured form:

$\text{Timestamp} : ((\overbrace{\vphantom{\langle c_1,\dots,c_k \rangle \in \mathbb{N}^k}e \in \mathbb{N}}^{\text{epoch}},\; \overbrace{\langle c_1,\dots,c_k \rangle \in \mathbb{N}^k}^{\text{loop counters}})$

The epoch corresponds to an integer timestamp in a conventional stream processor: input vertices label incoming data with an epoch that indicates the order in which that data should be processed. Although the easiest way to think of epochs is as a sequence number that increments with each message (or batch of messages), there is no requirement for epochs to arrive in strictly increasing order, but to make progress it will be necessary for the input to provide information about when no more messages with from a given epoch (or earlier) will be produced.

The loop counters are the more exotic component of a timestamp, and hint at the fact that we designed timely dataflow to support cyclic dataflow graphs. In particular, Naiad is designed to run differential dataflow programs that include incremental and iterative computation, in addition to other forms of iterative computation. A timestamp has $k \ge 0$ loop counters, where $k$ is the depth of nesting. Consider this simple timely dataflow graph, which shows a loop context nested in the top-level streaming context.

Vertices A and D are not in any loop context, so timestamps at these vertices have no loop counters; vertices B, C, and F are nested in a single loop context, so their timestamps have a single loop counter. The Ingress (I) and Egress (E) vertices sit on the boundary of a loop context, and respectively add and remove loop counters to and from timestamps as messages pass through them. Timely dataflow places no limit on the depth of nesting, so it can be used to implement nested iterative algorithms like SCC.

What’s the programming model?

As in other dataflow models, the main programming element in a timely dataflow program is the vertex. A timely dataflow vertex is a possibly stateful object that sends and receives messages, and requests and receives notifications.

Message exchange is completely asynchronous. A vertex $u$ sends a message by invoking $u.\textsc{SendBy}(e : \text{Edge}, m : \text{Message}, t : \text{Timestamp})$ to send a message, and it will be delivered to vertex $v$ at the other end of the edge in a callback, $v.\textsc{OnRecv}(e, m, t)$.

Notification delivery is synchronous. A vertex $v$ requests a notification by invoking $v.\textsc{NotifyAt}(t : \text{Timestamp})$, and it will be delivered in a callback, $v.\textsc{OnNotify}(t)$, after all messages with timestamp $t' \le t$ have been delivered, where timestamps are ordered lexicographically (effectively, in the order that would result from sorting first by the epoch, then by the first loop counter, then the second, and so on).

Vertices can modify arbitrary state in their callbacks, and send messages on any of their outgoing edges. The main constraint is that messages and notifications never go backwards in time: the timestamps on messages and notifications must be greater than or equal to the time at which the callback was invoked. Feedback vertices (such as F in the graph above) increment the rightmost loop counter in a timestamp, and must appear on every cycle in a timely dataflow graph. Together, these constraints ensure that messages cannot circulate forever, which means that notifications will eventually be delivered.

Using these callbacks, it is straightforward to implement many common patterns. For example, a Select vertex that applies a function to all of its incoming messages can be implemented using only $\textsc{OnRecv}$:

class SelectVertex<R, S, T> : Vertex<T> {
private Func<R, S> selector;
public override void OnRecv(Edge input, R message, T time) {
this.SendBy(this.output, this.selector(message), time);
}
}


Synchronous notification is useful when the output depends on all of the input at a given time. For example, a simple Count vertex that outputs the number of incoming messages at each time:

class CountVertex<S, T> : Vertex<T> {
private Dictionary<T, int> counts;

// Updates count for the incoming message's timestamp.
public override void OnRecv(Edge input, R message, T time) {
if (!this.counts.ContainsKey(time)) {
this.counts[time] = 1;
this.NotifyAt(time); // Request a notification for this time.
} else {
this.counts[time] = this.counts[time] + 1;
}
}

// Sends count for the notification's timestamp.
public override void OnNotify(T time) {
this.SendBy(this.output, this.counts[time], time);
this.counts.Remove(time);
}
}


More complicated vertex implementations are possible. For example, the Count vertex could be made incremental, by keeping track of the counts at previous times and including them in the results it outputs. More generally, we have implemented all of the differential dataflow LINQ operators (and more) as timely dataflow vertices. We’ll return to how these two forms of dataflow play together in a future post.