Parallelism in .NET – Part 9, Configuration in PLINQ and TPL

Parallel LINQ and the Task Parallel Library contain many options for configuration.  Although the default configuration options are often ideal, there are times when customizing the behavior is desirable.  Both frameworks provide full configuration support.

When working with Data Parallelism, there is one primary configuration option we often need to control – the number of threads we want the system to use when parallelizing our routine.  By default, PLINQ and the TPL both use the ThreadPool to schedule tasks.  Given the major improvements in the ThreadPool in CLR 4, this default behavior is often ideal. 

However, there are times that the default behavior is not appropriate.  For example, if you are working on multiple threads simultaneously, and want to schedule parallel operations from within both threads, you might want to consider restricting each parallel operation to using a subset of the processing cores of the system.  Not doing this might over-parallelize your routine, which leads to inefficiencies from having too many context switches.

In the Task Parallel Library, configuration is handled via the ParallelOptions class.  All of the methods of the Parallel class have an overload which accepts a ParallelOptions argument.

We configure the Parallel class by setting the ParallelOptions.MaxDegreeOfParallelism property.  For example, let’s revisit one of the simple data parallel examples from Part 2:

Parallel.For(0, pixelData.GetUpperBound(0), row =>
{
    for (int col=0; col < pixelData.GetUpperBound(1); ++col)
    {
        pixelData[row, col] = AdjustContrast(pixelData[row, col], minPixel, maxPixel);
    }
});

Here, we’re looping through an image, and calling a method on each pixel in the image.  If this was being done on a separate thread, and we knew another thread within our system was going to be doing a similar operation, we likely would want to restrict this to using half of the cores on the system.  This could be accomplished easily by doing:

var options = new ParallelOptions();
options.MaxDegreeOfParallelism = Math.Max(Environment.ProcessorCount / 2, 1);

Parallel.For(0, pixelData.GetUpperBound(0), options, row =>
{
    for (int col=0; col < pixelData.GetUpperBound(1); ++col)
    {
        pixelData[row, col] = AdjustContrast(pixelData[row, col], minPixel, maxPixel);
    }
});

Now, we’re restricting this routine to using no more than half the cores in our system.  Note that I included a check to prevent a single core system from supplying zero; without this check, we’d potentially cause an exception.  I also did not hard code a specific value for the MaxDegreeOfParallelism property.  One of our goals when parallelizing a routine is allowing it to scale on better hardware.  Specifying a hard-coded value would contradict that goal.

Parallel LINQ also supports configuration, and in fact, has quite a few more options for configuring the system.  The main configuration option we most often need is the same as our TPL option: we need to supply the maximum number of processing threads.  In PLINQ, this is done via a new extension method on ParallelQuery<T>: ParallelEnumerable.WithDegreeOfParallelism.

Let’s revisit our declarative data parallelism sample from Part 6:

double min = collection.AsParallel().Min(item => item.PerformComputation());

Here, we’re performing a computation on each element in the collection, and saving the minimum value of this operation.  If we wanted to restrict this to a limited number of threads, we would add our new extension method:

int maxThreads = Math.Max(Environment.ProcessorCount / 2, 1);
double min = collection
                 .AsParallel()
                 .WithDegreeOfParallelism(maxThreads)
                 .Min(item => item.PerformComputation());

This automatically restricts the PLINQ query to half of the threads on the system.

PLINQ provides some additional configuration options.  By default, PLINQ will occasionally revert to processing a query in parallel.  This occurs because many queries, if parallelized, typically actually cause an overall slowdown compared to a serial processing equivalent.  By analyzing the “shape” of the query, PLINQ often decides to run a query serially instead of in parallel.  This can occur for (taken from MSDN):

  • Queries that contain a Select, indexed Where, indexed SelectMany, or ElementAt clause after an ordering or filtering operator that has removed or rearranged original indices.
  • Queries that contain a Take, TakeWhile, Skip, SkipWhile operator and where indices in the source sequence are not in the original order.
  • Queries that contain Zip or SequenceEquals, unless one of the data sources has an originally ordered index and the other data source is indexable (i.e. an array or IList(T)).
  • Queries that contain Concat, unless it is applied to indexable data sources.
  • Queries that contain Reverse, unless applied to an indexable data source.

If the specific query follows these rules, PLINQ will run the query on a single thread.  However, none of these rules look at the specific work being done in the delegates, only at the “shape” of the query.  There are cases where running in parallel may still be beneficial, even if the shape is one where it typically parallelizes poorly.  In these cases, you can override the default behavior by using the WithExecutionMode extension method.  This would be done like so:

var reversed = collection
                  .AsParallel()
                  .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                  .Select(i => i.PerformComputation())
                  .Reverse();

Here, the default behavior would be to not parallelize the query unless collection implemented IList<T>.  We can force this to run in parallel by adding the WithExecutionMode extension method in the method chain.

Finally, PLINQ has the ability to configure how results are returned.  When a query is filtering or selecting an input collection, the results will need to be streamed back into a single IEnumerable<T> result.  For example, the method above returns a new, reversed collection.  In this case, the processing of the collection will be done in parallel, but the results need to be streamed back to the caller serially, so they can be enumerated on a single thread.

This streaming introduces overhead.  IEnumerable<T> isn’t designed with thread safety in mind, so the system needs to handle merging the parallel processes back into a single stream, which introduces synchronization issues.  There are two extremes of how this could be accomplished, but both extremes have disadvantages.

The system could watch each thread, and whenever a thread produces a result, take that result and send it back to the caller.  This would mean that the calling thread would have access to the data as soon as data is available, which is the benefit of this approach.  However, it also means that every item is introducing synchronization overhead, since each item needs to be merged individually.

On the other extreme, the system could wait until all of the results from all of the threads were ready, then push all of the results back to the calling thread in one shot.  The advantage here is that the least amount of synchronization is added to the system, which means the query will, on a whole, run the fastest.  However, the calling thread will have to wait for all elements to be processed, so this could introduce a long delay between when a parallel query begins and when results are returned.

The default behavior in PLINQ is actually between these two extremes.  By default, PLINQ maintains an internal buffer, and chooses an optimal buffer size to maintain.  Query results are accumulated into the buffer, then returned in the IEnumerable<T> result in chunks.  This provides reasonably fast access to the results, as well as good overall throughput, in most scenarios.

However, if we know the nature of our algorithm, we may decide we would prefer one of the other extremes.  This can be done by using the WithMergeOptions extension method.  For example, if we know that our PerformComputation() routine is very slow, but also variable in runtime, we may want to retrieve results as they are available, with no bufferring.  This can be done by changing our above routine to:

var reversed = collection
                  .AsParallel()
                  .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                  .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                  .Select(i => i.PerformComputation())
                  .Reverse();

On the other hand, if are already on a background thread, and we want to allow the system to maximize its speed, we might want to allow the system to fully buffer the results:

var reversed = collection
                  .AsParallel()
                  .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                  .WithMergeOptions(ParallelMergeOptions.FullyBuffered)
                  .Select(i => i.PerformComputation())
                  .Reverse();

Notice, also, that you can specify multiple configuration options in a parallel query.  By chaining these extension methods together, we generate a query that will always run in parallel, and will always complete before making the results available in our IEnumerable<T>.

About Reed
Reed Copsey, Jr. - http://www.reedcopsey.com - http://twitter.com/ReedCopsey

Comments

One Response to “Parallelism in .NET – Part 9, Configuration in PLINQ and TPL”

Trackbacks

Check out what others are saying about this post...
  1. [...] the same cancellation model, as well.  Here, we supply our CancellationToken as part of the configuration.  The ParallelOptions class contains a property for the CancellationToken.  This allows [...]



Speak Your Mind

Tell us what you're thinking...
and oh, if you want a pic to show with your comment, go get a gravatar!