Parallelism in .NET – Part 4, Imperative Data Parallelism: Aggregation

In the article on simple data parallelism, I described how to perform an operation on an entire collection of elements in parallel.  Often, this is not adequate, as the parallel operation is going to be performing some form of aggregation.

Simple examples of this might include taking the sum of the results of processing a function on each element in the collection, or finding the minimum of the collection given some criteria.  This can be done using the techniques described in simple data parallelism, however, special care needs to be taken into account to synchronize the shared data appropriately.  The Task Parallel Library has tools to assist in this synchronization.

The main issue with aggregation when parallelizing a routine is that you need to handle synchronization of data.  Since multiple threads will need to write to a shared portion of data.  Suppose, for example, that we wanted to parallelize a simple loop that looked for the minimum value within a dataset:

double min = double.MaxValue;
foreach(var item in collection)
{
    double value = item.PerformComputation();
    min = System.Math.Min(min, value);
}

This seems like a good candidate for parallelization, but there is a problem here.  If we just wrap this into a call to Parallel.ForEach, we’ll introduce a critical race condition, and get the wrong answer.  Let’s look at what happens here:

// Buggy code!  Do not use!
double min = double.MaxValue;
Parallel.ForEach(collection, item =>
{
    double value = item.PerformComputation();
    min = System.Math.Min(min, value);
});

This code has a fatal flaw: min will be checked, then set, by multiple threads simultaneously.  Two threads may perform the check at the same time, and set the wrong value for min.  Say we get a value of 1 in thread 1, and a value of 2 in thread 2, and these two elements are the first two to run.  If both hit the min check line at the same time, both will determine that min should change, to 1 and 2 respectively.  If element 1 happens to set the variable first, then element 2 sets the min variable, we’ll detect a min value of 2 instead of 1.  This can lead to wrong answers.

Unfortunately, fixing this, with the Parallel.ForEach call we’re using, would require adding locking.  We would need to rewrite this like:

// Safe, but slow
double min = double.MaxValue;
// Make a "lock" object
object syncObject = new object(); 
Parallel.ForEach(collection, item =>
{
    double value = item.PerformComputation();
    lock(syncObject)
        min = System.Math.Min(min, value);
});

This will potentially add a huge amount of overhead to our calculation.  Since we can potentially block while waiting on the lock for every single iteration, we will most likely slow this down to where it is actually quite a bit slower than our serial implementation.  The problem is the lock statement – any time you use lock(object), you’re almost assuring reduced performance in a parallel situation.  This leads to two observations I’ll make:

When parallelizing a routine, try to avoid locks.

That being said:

Always add any and all required synchronization to avoid race conditions.

These two observations tend to be opposing forces – we often need to synchronize our algorithms, but we also want to avoid the synchronization when possible.  Looking at our routine, there is no way to directly avoid this lock, since each element is potentially being run on a separate thread, and this lock is necessary in order for our routine to function correctly every time.

However, this isn’t the only way to design this routine to implement this algorithm.  Realize that, although our collection may have thousands or even millions of elements, we have a limited number of Processing Elements (PE).  Processing Element is the standard term for a hardware element which can process and execute instructions.  This typically is a core in your processor, but many modern systems have multiple hardware execution threads per core.  The Task Parallel Library will not execute the work for each item in the collection as a separate work item. Instead, when Parallel.ForEach executes, it will partition the collection into larger “chunks” which get processed on different threads via the ThreadPool.  This helps reduce the threading overhead, and help the overall speed.  In general, the Parallel class will only use one thread per PE in the system.

Given the fact that there are typically fewer threads than work items, we can rethink our algorithm design.  We can parallelize our algorithm more effectively by approaching it differently.  Because the basic aggregation we are doing here (Min) is communitive, we do not need to perform this in a given order.  We knew this to be true already – otherwise, we wouldn’t have been able to parallelize this routine in the first place.  With this in mind, we can treat each thread’s work independently, allowing each thread to serially process many elements with no locking, then, after all the threads are complete, “merge” together the results.

This can be accomplished via a different set of overloads in the Parallel class: Parallel.ForEach<TSource,TLocal>.  The idea behind these overloads is to allow each thread to begin by initializing some local state (TLocal).  The thread will then process an entire set of items in the source collection, providing that state to the delegate which processes an individual item.  Finally, at the end, a separate delegate is run which allows you to handle merging that local state into your final results.

To rewriting our routine using Parallel.ForEach<TSource,TLocal>, we need to provide three delegates instead of one.  The most basic version of this function is declared as:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal, TLocal> body,
    Action<TLocal> localFinally
)

The first delegate (the localInit argument) is defined as Func<TLocal>.  This delegate initializes our local state.  It should return some object we can use to track the results of a single thread’s operations.

The second delegate (the body argument) is where our main processing occurs, although now, instead of being an Action<T>, we actually provide a Func<TSource, ParallelLoopState, TLocal, TLocal> delegate.  This delegate will receive three arguments: our original element from the collection (TSource), a ParallelLoopState which we can use for early termination, and the instance of our local state we created (TLocal).  It should do whatever processing you wish to occur per element, then return the value of the local state after processing is completed.

The third delegate (the localFinally argument) is defined as Action<TLocal>.  This delegate is passed our local state after it’s been processed by all of the elements this thread will handle.  This is where you can merge your final results together.  This may require synchronization, but now, instead of synchronizing once per element (potentially millions of times), you’ll only have to synchronize once per thread, which is an ideal situation.

Now that I’ve explained how this works, lets look at the code:

// Safe, and fast!
double min = double.MaxValue;
// Make a "lock" object
object syncObject = new object(); 
Parallel.ForEach(
    collection, 
    // First, we provide a local state initialization delegate.
    () => double.MaxValue,
    // Next, we supply the body, which takes the original item, loop state,
    // and local state, and returns a new local state
    (item, loopState, localState) =>
    {
        double value = item.PerformComputation();
        return System.Math.Min(localState, value);
    },
    // Finally, we provide an Action<TLocal>, to "merge" results together
    localState =>
    {
        // This requires locking, but it's only once per used thread
        lock(syncObj)
            min = System.Math.Min(min, localState);
    }
);

Although this is a bit more complicated than the previous version, it is now both thread-safe, and has minimal locking. 

This same approach can be used by Parallel.For, although now, it’s Parallel.For<TLocal>.  When working with Parallel.For<TLocal>, you use the same triplet of delegates, with the same purpose and results.

Also, many times, you can completely avoid locking by using a method of the Interlocked class to perform the final aggregation in an atomic operation.  The MSDN example demonstrating this same technique using Parallel.For uses the Interlocked class instead of a lock, since they are doing a sum operation on a long variable, which is possible via Interlocked.Add.

By taking advantage of local state, we can use the Parallel class methods to parallelize algorithms such as aggregation, which, at first, may seem like poor candidates for parallelization.  Doing so requires careful consideration, and often requires a slight redesign of the algorithm, but the performance gains can be significant if handled in a way to avoid excessive synchronization.

About Reed
Reed Copsey, Jr. - http://www.reedcopsey.com - http://twitter.com/ReedCopsey

Comments

11 Responses to “Parallelism in .NET – Part 4, Imperative Data Parallelism: Aggregation”
  1. Ed Ball says:

    Great articles! I really appreciate this relaxed introduction to the new APIs.

  2. Reed says:

    Thanks for the feedback! I’m going to keep them coming…

  3. Jeff Sternal says:

    Thanks for the excellent series!

    Do I have this right?

    Behind the scenes, Parallel.ForEach is creating a ThreadLocal to store the local state, using the ThreadLocal(Func) constructor. So the TPL decides how many threads it will use and executes your local state initialization delegate once for each thread (on the first iteration handled by that thread).

    In a world without ThreadLocal, .NET would need a factory method to create an object instance to host the aggregation delegate, which would be significantly more cumbersome.

    • Jeff Sternal says:

      “In a world without ThreadLocal, .NET would need a factory method to create an object instance to host the aggregation delegate, which would be significantly more cumbersome.”

      I should have written “to host the aggregation delegate *and maintain a per-thread accumulator variable*, which would be significantly more cumbersome.”

    • Reed says:

      Basically, yes, I believe this is (generally) what’s happening underneath. Your “local state” variable is actually being stored in a ThreadLocal, which is then used in each iteration (value passed into state, returned, and reset to the local thread state). When you do the final delegate, you are passed the accumulated results of the ThreadLocal variable, which you can then use to do your aggregation. Finally, the ThreadLocal is disposed of, cleaning it up.

      Without doing this, you’d need to either add locking, or put in some form of Dictionary object that could be used to store the local state – either of which would add quite a bit of contention, and slow things down.

  4. Naim says:

    Great article! I like the way it’s been done, even the final action has been made part of the Parallel.ForEach call :)

  5. Todd says:

    For a minute I thought you were gonna drop the “MapReduce” scheme here but instead you went with the LocalState.

    I am still having a hard time grasping this LocalState example and how it is different than original locking issue. In my mind I have the following:

    a) There is some kind of WorkItemQueue filled with the doubles in the collection.
    b) There is a pool of Threads where the # of threads is “generally” based on the # of processing elements (i.e. cores)
    c) The TPL will unleash all the threads in the pool at once
    d) Rathe than issuing a single WorkItem per thread, the TPL “partitions” the WorkItemQueue such that each thread gets a chunk of WorkItems.

    I am to understand that the Action(TLocal) is the delegate that is called at the conclusion of the thread/task processing each WorkItem in its chunk…head scratching.

    Also, in early comment you mentioned the overhead if a Dictionary was used. However, this may minimized with the use of the the ConcurrentDictionary….rendering a MapReduce-like solution. Regardless, a lock is necessary somewhere I suppose. Ah…. never-mind….I cheated and looked ahead….I see the nice PLINQ example in Part 6 :-). Nice!

Trackbacks

Check out what others are saying about this post...
  1. […] When working with a problem that can be decomposed by data, we have a collection, and some operation being performed upon the collection.  I’ve demonstrated how this can be parallelized using the Task Parallel Library and imperative programming using imperative data parallelism via the Parallel class.  While this provides a huge step forward in terms of power and capabilities, in many cases, special care must still be given for relative common scenarios. […]

  2. “Parallelism in .NET” Series by Reed Copsey, Jr….

    Reed Copsey, Jr. has been writing a great series of articles on parallelism with the .NET Framework 4…

  3. […] information on synchronized parallel programming, take a look at http://reedcopsey.com/2010/01/22/parallelism-in-net-part-4-imperative-data-parallelism-aggregation/. The author goes over how you can synchronize dependent data using the .NET 4.0 multithreading […]



Speak Your Mind

Tell us what you're thinking...
and oh, if you want a pic to show with your comment, go get a gravatar!