Friday, April 5, 2024

ParallelForAsync alternatives

Feburary 2025 Note — I just found a Microsoft article published in January 2023 that discusses how to throttle Task concurrency. I've appended a section below to discuss the technique the example uses.


The Parallel.For and similar methods available in .NET Framework 4.0+ and .NET Standard 2.0+ are designed to simplify parallelism of CPU intensive processing. The Task and related classes (released a bit sooner) when combined with the async and await C# keywords facilitate simple coding patterns for fine-grained concurrency. Those class families are intended to help with different types of problems, but there are times when it's convenient to combine them, and a typical case I have is to query and update large numbers of Azure Blobs as efficiently as possible.

NOTE: The Parallel.ForEachAsync family in .NET 6+ provides a simple and convenient way to combine parallelism and fine-grained concurrently, but I couldn't use that method because my library was compelled to target Standard 2.0. This article describes the search for an alternative older technique.

ForEach and await NO

A naïve code sample might look like this:

Parallel.ForEach(nameSource, async (name) =>
{
    await InspectBlobAsync(name, cancelToken);
}

Don't do that. I tried this with a loop over 100 Tasks and they all start simultaneously and run in parallel. The ForEach statement quickly drops through as soon as all the Tasks are started. I haven't tried this with 1000 Tasks, or more, but I expect some kind of resource exhaustion or strange crashes to occur eventually as you hit some runtime or operating system limit.

Task WhenAll NO

The following code seems like a natural simple solution:

var Tasks = nameSource.Select(name => InspectBlobAsync(name, cancelToken));
await Task.WhenAll(tasks);

Unfortunately, the Task ballistics in this case are similar to the previous use of ForEach. All the Tasks start simultaneously, with the difference being that the WhenAll waits for all the Tasks to finish (by definition of how await works). Using WhenAll is a perfectly valid and popular coding pattern, but only when the number Tasks is less than some "reasonable" limit which will depend on what sort of work the Tasks are doing in your environment.

Semaphore throttling NO

You will find many articles that use a SemaphoreSlim to throttle a while loop so that a maximum number of Tasks run at once. It's like a gate that opens to let a new Task in as each leaves. I found this technique worked perfectly and planned to release it, but then I discovered that cancellation was a serious problem. I couldn't find a sensible way to signal a cancellation token and get the loop to gracefully exit and ignore waiting Tasks. There may be a way, but it was too much bother so I abandoned the semaphore technique.

ConcurrentQueue YES

I finally decided to use the ConcurrentQueue class. I load the queue with the names of the thousands of Blobs, which I think is acceptable, and not resource stressful. I then create some Tasks, each containing a simple while loop which which dequeues (aka pulls) the names off the queue and asynchronously processes them. The skeleton code looks lik this:

int concurrentCount = 4;
using (cts = new CancellationTokenSource())
{
  var queue = new ConcurrentQueue<string>(nameSource);
  var pullers = Enumerable.Range(1, concurrentCount).Select(seq => QueuePuller(seq, queue, cts.Token));
  await Task.WhenAll(pullers);
}

async Task QueuePuller(int seq, ConcurrentQueue<string> queue, CancellationToken cancelToken)
{
  while (queue.TryDequeue(out string name))
  {
    try
    {
      await InspectBlobAsync(name, cancelToken);
    }
    catch (OperationCanceledException)
    {
      // Report a cancellation. Break out of the loop and end the puller.
      break;
    }
    catch (Exception ex)
    {
      // Inspect the error and decide to report it or break and end the puller.
    }
  }
}

Although there is more code when using ConcurrentQueue, I feel that it's simple and sensible. The concurrentCount value can be adjusted as needed or set according to the number of cores and it becomes the parallelism throttle (like using a Semaphore, but simpler). Cancellation works simply as well, as signaling the cancel token causes all of the pullers to gracefully exit. The full experimental C# source code is available in https://dev.azure.com/orthogonal/ParallelTasks.

BlockingCollection

It's worth mentioning the BlockingCollection class for more sophisticated scenarios. The BlockingCollection class can be used in a similar pattern to the ConcurrentQueue class, where each can have items pushed and pulled from their internal collections, but the former provides more control over how this happens. See the docs for more information and run searches for samples.


UPDATE — Microsoft throttling example

The Microsoft Learn article Consuming the Task-based Asynchronous Pattern section titled Throttling provides an example of how to throttle a large number of asynchronous operations.

The throttling technique is based upon a List of Tasks that is initially loaded with a starter set up to the throttle limit, then as each Task completes, one of the remaining ones is added to the List. In a sense it's using the collection as an internal queue.

The code is a little bit verbose and unclear to look at, but it does work correctly. The sample code does not mention cancellation, and I haven't got time to test different error handling or cancellation scenarios. I'll leave that as an exercise for the reader. I'll post an update on that if I learn anything useful in the future.


No comments:

Post a Comment