Parallelism in .NET – Part 18, Task Continuations with Multiple Tasks
In my introduction to Task continuations I demonstrated how the Task class provides a more expressive alternative to traditional callbacks. Task continuations provide a much cleaner syntax to traditional callbacks, but there are other reasons to switch to using continuations…
Task continuations provide a clean syntax, and a very simple, elegant means of synchronizing asynchronous method results with the user interface. In addition, continuations provide a very simple, elegant means of working with collections of tasks.
Prior to .NET 4, working with multiple related asynchronous method calls was very tricky. If, for example, we wanted to run two asynchronous operations, followed by a single method call which we wanted to run when the first two methods completed, we’d have to program all of the handling ourselves. We would likely need to take some approach such as using a shared callback which synchronized against a common variable, or using a WaitHandle shared within the callbacks to allow one to wait for the second. Although this could be accomplished easily enough, it requires manually placing this handling into every algorithm which requires this form of blocking. This is error prone, difficult, and can easily lead to subtle bugs.
Similar to how the Task class static methods providing a way to block until multiple tasks have completed, TaskFactory contains static methods which allow a continuation to be scheduled upon the completion of multiple tasks: TaskFactory.ContinueWhenAll.
This allows you to easily specify a single delegate to run when a collection of tasks has completed. For example, suppose we have a class which fetches data from the network. This can be a long running operation, and potentially fail in certain situations, such as a server being down. As a result, we have three separate servers which we will “query†for our information. Now, suppose we want to grab data from all three servers, and verify that the results are the same from all three.
With traditional asynchronous programming in .NET, this would require using three separate callbacks, and managing the synchronization between the various operations ourselves. The Task and TaskFactory classes simplify this for us, allowing us to write:
var server1 = Task.Factory.StartNew( () => networkClass.GetResults(firstServer) ); var server2 = Task.Factory.StartNew( () => networkClass.GetResults(secondServer) ); var server3 = Task.Factory.StartNew( () => networkClass.GetResults(thirdServer) ); var result = Task.Factory.ContinueWhenAll( new[] {server1, server2, server3 }, (tasks) => { // Propogate exceptions (see below) Task.WaitAll(tasks); return this.CompareTaskResults( tasks[0].Result, tasks[1].Result, tasks[2].Result); });
This is clean, simple, and elegant. The one complication is the Task.WaitAll(tasks); statement.
Although the continuation will not complete until all three tasks (server1, server2, and server3) have completed, there is a potential snag. If the networkClass.GetResults method fails, and raises an exception, we want to make sure to handle it cleanly. By using Task.WaitAll, any exceptions raised within any of our original tasks will get wrapped into a single AggregateException by the WaitAll method, providing us a simplified means of handling the exceptions. If we wait on the continuation, we can trap this AggregateException, and handle it cleanly. Without this line, it’s possible that an exception could remain uncaught and unhandled by a task, which later might trigger a nasty UnobservedTaskException. This would happen any time two of our original tasks failed.
Just as we can schedule a continuation to occur when an entire collection of tasks has completed, we can just as easily setup a continuation to run when any single task within a collection completes. If, for example, we didn’t need to compare the results of all three network locations, but only use one, we could still schedule three tasks. We could then have our completion logic work on the first task which completed, and ignore the others. This is done via TaskFactory.ContinueWhenAny:
var server1 = Task.Factory.StartNew( () => networkClass.GetResults(firstServer) ); var server2 = Task.Factory.StartNew( () => networkClass.GetResults(secondServer) ); var server3 = Task.Factory.StartNew( () => networkClass.GetResults(thirdServer) ); var result = Task.Factory.ContinueWhenAny( new[] {server1, server2, server3 }, (firstTask) => { return this.ProcessTaskResult(firstTask.Result); });
Here, instead of working with all three tasks, we’re just using the first task which finishes. This is very useful, as it allows us to easily work with results of multiple operations, and “throw away†the others. However, you must take care when using ContinueWhenAny to properly handle exceptions. At some point, you should always wait on each task (or use the Task.Result property) in order to propogate any exceptions raised from within the task. Failing to do so can lead to an UnobservedTaskException.
this should be a better if you publish this series in one PDF.
Nice posts
Thank you for the suggestion Meysam. I will consider this in the future. I am still planning to add onto this series, however, so there may be more to add before it would be ready for that.
I love your sample but I am running into an an issue where I am getting deadlocks from trying to execute too many tasks against SQL Server at once. The method I am executing in my task is basically recalculating claims in the database but in order to update my tables I delete old calculation vector rows and then insert new ones based upon the recalculation values. I tried putting a lock around my execute in my DAL but this did not fix it. Is there are way to control the number of executing tasks or a better way for me to structure my code to handle this problem.
//int result = 0;
Task.Factory.StartNew(() =>
{
foreach (int claimID in recalcs.Keys)
{
if (DAL.RecalculateClaim(claimID))
++result;
}
}).ContinueWith(_ =>
{
Console.WriteLine(“Result”, result);
});
Ryan,
The code you pasted will only use a single task. The errors you’re probably seeing are outside of this code, and in the DAL. I’d guess that you’re using a non-threadsafe class inside of your DAL, and doing this processing on a background thread is causing problems. It’s difficult to tell from the code sample pasted, however.
-Reed
Hello
I have a assembly with a static method. Within this method I do a
var task = Task.Factory.StartNew(() => {
LogService.Execute(request);
});
The method then returns to the application. However: The LogService.Execute(request); is not executed. If I do a task.Wait(); it waits, but then the logging will behave in a syncrounus way, and it is not wanted.
Do anyone have a tip on what can be the issue in this case?
Markus,
The code above should execute the action, but it will not happen immediately. When you call Task.Factory.StartNew, the task will be scheduled for execution (typically on a thread pool thread), and should execute soon after. task.Wait() will, of course, block until it’s complete – but it should execute regardless.
That being said, if there is something within LogService’s Execute method that is blocking, it may prevent the results from being immediately visible.
Can I Aggregate fault exception thrown by WCF service operations, if I’m making parallel calls to the multiple service operations?
Alok,
You could, though typically this would be a bad idea. Fault exceptions for WCF typically require special handling, which would be better handled in the individual tasks. Personally, I would handle the fault exceptions, then propogate my own custom exception (which would get aggregated “cleanly”).
Though I understand that one should call Task.WaitAll() in the ContinueWhenAll example above to collect all AggregateExceptions from the awaited tasks, I think that in this specific example it is not really required because accessing the Task.Result property will also throw an AggregateException when the task failed.
The problem with relying on Task.Result and not using Task.WaitAll() in this case is that you’ll only catch the first exception case. Say server1 and server3 both raised exceptions – you’d only get the exception raised for server1. This will leave an unhandled exception in the server3 Task, which will show up during finalization and tear down your AppDomain (unless you handle UnhandledTaskExceptions, or are using .NET 4.5).
Hi
not sure why code not working, that is it is not possible to access Result property in the continuation, it does not exist, if you copy bellow code, see var x = tasks[0].Result;
var server1 = Task.Factory.StartNew(
() => { return; });
var server2 = Task.Factory.StartNew(
() => { return; });
var server3 = Task.Factory.StartNew(
() => { return; });
var result = Task.Factory.ContinueWhenAll(new[] { server1, server2, server3 },
(tasks) =>
{
// Propogate exceptions (see below)
Task.WaitAll(tasks);
var x = tasks[0].Result;
});
Since your delegates just use “return;”, Task.Factory.StartNew will return a Task, not a Task. This means there is no “Result” property to access.
in your code example u access the Result of a task without returning anything
Cody,
I am actually not doing that. In my code, the delegate is using an expression body in the lambda. Since GetResults() returns values, the delegate has a return value, which means the resulting Task is a Task.
-Reed
never mind. I got it
because it’s not returning any result