Friday, January 24, 2014

Parallel.For with a disposable class

A post into the ozdotnet list in Feb 2013.

----------------
Chaps, I think I have found the formally correct way of giving each Parallel ForEach thread its own copy of a disposible and unsharable class. There is an overload of ForEach that lets you do something at the start and end of each worker thread. Below is a skeleton of my code. It's interesting to put trace displays in the 3 callback blocks and look at the lifetime of the threads and the order in which things happen.

The following skeleton sample code runs a parallel loop over an enumerable set of file names to calculate a hash of the recursive contents of all files under a folder. A small helper class lives for the duration of each worker thread. Each worker accumulates a subtotal hash which is later safely xor'd into the total hash with a safety lock.

The important thing about the parallel loop is that it has 3 callbacks. The first runs when a worker thread is created, and we simply return a new work data object. The second runs when a worker must process an item, and we hash a file and xor it into the work data sub-hash. The third runs when the worker is finished, and we safely xor the sub-hash into the total.

This pattern is very effective, as a "chunk" of work is done on each worker thread, then each chunk is added to the total. A lock only exists for a short time when each worker ends and adds its sub-hash to the total hash.

While the parallel loop is running, calling cts.Cancel() will asynchronously cause the loop to graciously cancel all threads and the ForEach will throw an OperationCancelledException that you can catch and report to the caller.

Remember that the Parallel.ForEach call blocks. In more realistic scenarios you would place the ForEach call inside a Task and await it, but I haven't bothered here because it clutters the sample code.

private sealed Class WorkData
{
  public HashAlgorithm Hasher = MD5.Create();
  public byte[] SubHash = new byte[16];
}
 
private byte[] totalHash = new byte[16];
var cts = new CancellationTokenSource();
var po = new ParallelOptions() { CancellationToken = cts.Token };
var filesource = SafeEnumerateFiles(@"C:\temp\testdata");
 
try
{
  Parallel.ForEach<string,WorkData>(filesource, po, () =>
  {
    return new WorkData();
  },
  (filename, pls, index, data) =>
  {
    po.CancellationToken.ThrowIfCancellationRequested();
    using (var stream = new FileStream(filename, FileMode.Open, FileAccess.Read))
    {
      byte[] hash = data.Hasher.ComputeHash(stream);
      XorBuffer(data.SubHash, hash);
    }
    return data;
  },
  (data) =>
  {
    lock (totalHash)
    {
      XorBuffer(totalHash, data.SubHash);
      data.Hasher.Dispose();
    }
  });
  Trace("Total hash = {0}", BitConverter.ToString(totalHash));
}
catch (OperationCancelledException)
{
  Trace("The parallel loop was cancelled");
}

No comments:

Post a Comment