Skip to content

Running distributed Naiad programs

by on October 18, 2012

Naiad is designed to take advantage of distributed compute clusters: a large number of processor cores enables Naiad to work faster by processing data in parallel, while the aggregate RAM of a cluster enables Naiad to scale to larger problem instances. In this post, I’ll talk a bit about how data-parallelism in Naiad works from an architectural point of view, then go into the details of how to write and run your own distributed Naiad programs. To follow along with the steps, I’d recommend downloading the Naiad source code and running some of the example applications in the Examples project, as I’ll be using these in the step-by-step instructions below.

Data, task and pipeline parallelism

In Naiad, much of the parallel speedup that we can achieve comes from data parallelism, which is when we have a large data set and can compute on different parts of it independently.  Many of the important Naiad operators (like Join, GroupBy, Min and, Count) take a “key selector”, which extracts a key from each record (typically an ID), and indicates to the system that records with different keys can be processed independently. If you’re familiar with MapReduce, this is the same insight that enables the reduce stage to execute in parallel: the map stage emits a collection of key-value pairs, then the shuffle stage rearranges the data so that all pairs with the same key are in one place, but different keys may be handled by different workers. In Naiad, every operator with a key selector is parallelized by partitioning the range of key values across Naiad workers, and introducing a data exchange before the operator if necessary.

Naiad also supports other forms of parallelism. For example, each operator is implemented as a miniature task, which does some combination of reading data from input channels, processing the incoming data, and sending data on output channels, and this creates the potential for task parallelism. Sometimes, there are several tasks runnable at once: for example, if the query plan forms a graph that is more interesting than a simple chain of operators, there will be several runnable tasks at any one time, and the a Naiad worker can get work on these without needing to coordinate with the other workers. This architecture also permits pipeline parallelism: while some workers process the later stages of a query on on epoch, other workers can start working on the earlier stages of the query on some later epoch. These techniques help to keep a Naiad cluster busy doing useful work, and this is a theme to which we will return in later posts.

Naiad workers and terminology

At this point, it might be helpful to say more about the architecture of a running Naiad computation. I’ve already hinted at the existence of Naiad “workers”, which can execute code in parallel, but what is a “worker”? In the current version of Naiad, we have the following concepts:

A worker is the smallest unit of parallel computation resource in Naiad, and it corresponds to a single thread that is pinned to one CPU.
A process is a larger unit of parallel computation resource in Naiad: it corresponds to a single operating system process that runs on a single computer. A process can contain one or more workers, and a machine may host one or more processes.
A vertex is a partition of a single Naiad operator (e.g. a Join), that maintains the state and performs the computation for a partition of the keyspace. A worker will perform the work for one or more vertices, and can in principle handle multiple vertices of the same operator, but the default policy is to assign one shard of each operator to each worker.

Naiad takes care of all data exchange between workers: between two vertices on the same worker it can pipeline records directly; between two workers in the same process it uses a lock-free queue; and between two different processes it automatically generates efficient serialization code and sends the records over a TCP connection.

Running the examples in parallel

Let’s now see how this translates into practice. At this point, you should make sure that you have downloaded and built the the example programs (N.B. I recommend that you build the solution in Release mode, as this will generate much faster code), and can run the connected components example application:

> Examples.exe connectedcomponents 1000000 2000000
Time to process: 00:00:18.5217276

The default behavior (with the above command-line parameters) is to run a single-process, single-worker Naiad computation. We can use the “-t” parameter to set the number of workers (threads) that the process uses:

> Examples.exe connectedcomponents 1000000 2000000 -t 2
Time to process: 00:00:11.8998532

Notice that the time (at least on my quad-core laptop) has improved from 18.5 to 11.9 seconds. The exact numbers you see will depend on your exact processor type and speed, but as long as you have multiple cores, you should see some improvement.

Running the examples in distributed mode

To run the same example in a distributed setting, we simply change some of the command-line parameters. To start experimenting with this, open up a second command prompt, and run the following commands at the two prompts:

> Examples.exe connectedcomponents 1000000 2000000 -t 1 -n 2 -h localhost:2101 localhost:2102 -p 0
> Examples.exe connectedcomponents 1000000 2000000 -t 1 -n 2 -h localhost:2101 localhost:2102 -p 1

I’ve introduced a few new command-line parameters here, which I’ll take a moment to explain. The “-n” parameter indicates the number of processes that will participate in this computation, in this case 2. The “-h” parameter introduces a list of hostname-port pairs that will be the network location for each process (in order, starting with process 0). Finally, the “-p” parameter indicates the ID of the process that is being started, 0 and 1 respectively. (When experimenting with distributed mode on a single machine, you can use the “–local” parameter instead of explicitly enumerating the hostnames with “-h”, and Naiad will fill in sensible defaults.)

With these commands, we’re running Naiad with two processes, each containing a single worker. On my machine, the result looks like:

Time to process: 00:00:14.5270331

This is faster than running with a single worker, but slower than running with two workers in the same process, as expected: Naiad has to do more work when exchanging data between the workers, as it has to serialize and deserialize all of the records that are exchanged.

Finally, if you have multiple computers, and you want to use them to perform a Naiad computation, it’s simply a case of changing the hostnames to refer to the computers that you want to use. For example, if you had two eight-core computers named “computer-0” and “computer-1”, you would run the following on computer-0:

> Examples.exe connectedcomponents 1000000 2000000 -t 8 -n 2 -h computer-0:2101 computer-1:2101 -p 0

and the following on computer-1:

> Examples.exe connectedcomponents 1000000 2000000 -t 8 -n 2 -h computer-0:2101 localhost:2101 -p 1

Writing a distributed Naiad program

In the current version, you deploy a Naiad program by running multiple copies of the same process, which are differentiated using the process ID. This deployment method might be familiar to MPI developers, which also uses a single program, multiple data (SPMD) style. This has the pleasant property that most non-distributed Naiad programs can be immediately run in distributed mode, but a few caveats apply.

First, you are encouraged to use the Configuration.FromArgs() method to parse your command-line arguments:

public static void Main(string[] args)
    using (var computation = NewComputation.FromArgs(ref args))
        // ... your Naiad code goes here ...

This sets up a computation (which is analogous to an MPI communicator) that reflects the number of threads and processes specified on the command-line.

Secondly, remember that all processes execute the same code. This means that, if you are loading data into a computation, you must ensure that only one process loads each record. We often use the following patterns:

var source1 = new BatchedDataSource();
var input1 = computation.NewInput(source1);

var source2 = new BatchedDataSource();
var input2 = computation.NewInput(source2);

// Only load data in process 0.
if (controller.ProcessID == 0)
    source1.OnNext(new string[] { "hello", "world" });

var bigArray = new int[] { ... };

// Load data from all processes, round-robin.
source2.OnNext(bigArray.Where((x, i) => i % controller.Configuration.Processes == controller.Configuration.ProcessID));

Finally, there are a couple of known limitations with the current distributed implementation:

  • All processes in a computation must have the same number of workers. Currently, each process assumes that all other processes connecting to it have the same number of workers, and uses this assumption in the data exchange.
  • The automatically-generated serialization code has been tested for a restricted set of types.
    Valid types are .NET primitive types, arrays of valid types, and structs of valid types. All serializable types must be declared public, and all of their members must be declared public and not readonly.
  • For each input, all processes must invoke OnNext() on that input the same number of times. However, different inputs may have differing numbers of epochs.

From → Naiad

Leave a Comment

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: