Friday, April 5, 2024

ParallelForAsync alternatives

The Parallel.For and similar methods available in .NET Framework 4.0+ and .NET Standard 2.0+ are designed to simplify parallelism of CPU intensive processing. The Task and related classes (released a bit sooner) when combined with the async and await C# keywords facilitate simple coding patterns for fine-grained concurrency. Those class families are intended to help with different types of problems, but there are times when it's convenient to combine them, and a typical case I have is to query and update large numbers of Azure Blobs as efficiently as possible.

NOTE: The Parallel.ForEachAsync family in .NET 6+ provides a simple and convenient way to combine parallelism and fine-grained concurrently, but I couldn't use that method because my library was compelled to target Standard 2.0. This article describes the search for an alternative older technique.

ForEach and await NO

A naïve code sample might look like this:

Parallel.ForEach(nameSource, async (name) =>
{
    await InspectBlobAsync(name, cancelToken);
}

However, Tasks for all of the source elements are scheduled at once, then the async work inside each one will run in parallel. But I can't quickly determine in what order they run or with what degree of parallelism, which might be chosen by the runtime or operating system (a separate research project). Note that if thousands of Tasks are scheduled, they don't all run at once and create thousands of Threads, they are internally throttled. Even worse, the Parallel.ForEach call is synchronous and blocks the caller. You might then think of putting the whole thing in a Task.Run(…) ... which creates more useless Threads and they continue after the outer call immedately returns, so it just makes things worse.

Semaphore throttling NO

You will find many articles that use a SemaphoreSlim to throttle a while loop so that a maximum number of Tasks run at once. It's like a gate that opens to let a new Task in as each leaves. I found this technique worked perfectly and planned to release it, but then I discovered that cancellation was a serious problem. I couldn't find a sensible way to signal a cancellation token and get the loop to gracefully exit and ignore waiting Tasks. There may be a way, but it was too much bother so I abandoned the semaphore technique.

Task WhenAll NO

The following code seems like a natural simple solution:

var Tasks = nameSource.Select(name => InspectBlobAsync(name, cancelToken));
await Task.WhenAll(tasks);

That coding pattern using WhenAll is fabulous at smaller scale, say, when you want to run a handful of concurrent network, file or database calls. If you use this pattern to process hundreds or thousands of async calls then you get the same ballistics as the previous ForEach async approach, that is, all the Tasks are created at once and then run as Threads unpredictably in parallel.

Since I'm processing many thousands of Blobs, I'm not sure if the creation and scheduling of that many Tasks is harmful or not. I can't find any definitive discussion of this topic. In any case, my instinct said not to use WhenAll, so I looked for a new technique.

ConcurrentQueue YES

I finally decided to use the ConcurrentQueue class. I load the queue with the names of the thousands of Blobs, which I think is acceptable, and not resource stressful. I then create some Tasks, each containing a simple while loop which which dequeues (aka pulls) the names off the queue and asynchronously processes them. The skeleton code looks lik this:

int concurrentCount = 4;
using (cts = new CancellationTokenSource())
{
  var queue = new ConcurrentQueue<string>(nameSource);
  var pullers = Enumerable.Range(1, concurrentCount).Select(seq => QueuePuller(seq, queue, cts.Token));
  await Task.WhenAll(pullers);
}

async Task QueuePuller(int seq, ConcurrentQueue<string> queue, CancellationToken cancelToken)
{
  while (queue.TryDequeue(out string name))
  {
    try
    {
      await InspectBlobAsync(name, cancelToken);
    }
    catch (OperationCanceledException)
    {
      // Report a cancellation. Break out of the loop and end the puller.
      break;
    }
    catch (Exception ex)
    {
      // Inspect the error and decide to report it or break and end the puller.
    }
  }
}

Although there is more code when using ConcurrentQueue, I feel that it's simple and sensible. The concurrentCount value can be adjusted as needed or set according to the number of cores and it becomes the parallelism throttle (like using a Semaphore, but simpler). Cancellation works simply as well, as signaling the cancel token causes all of the pullers to gracefully exit.

BlockingCollection

It's worth mentioning the BlockingCollection class for more sophisticated scenarios. The BlockingCollection class can be used in a similar pattern to the ConcurrentQueue class, where each can have items pushed and pulled from their internal collections, but the former provides more control over how this happens. See the docs for more information and run searches for samples.