Process files concurrently and asynchronously

For this kind of job a powerful tool you could use is the TPL Dataflow library. With this tool you can create a processing pipeline consisting of many linked blocks, with the data flowing from the first block to the last (circles and meshes are also possible).

The advantages of this approach are:

  1. You get data-parallelism on top of task-parallelism. All blocks are working concurrently and independently from each other.
  2. You can configure optimally the level of concurrency (a.k.a. degree of parallelism) of each heterogeneous operation. For example doing API calls may be highly parallelizable, while reading from the hard disk may be not parallelizable at all.
  3. You get advanced options out of the box (BoundedCapacity, CancellationToken and others).
  4. You get built-in support for both synchronous and asynchronous delegates.

Below is how you could rewrite your original code in TPL Dataflow terms. Three blocks are used, two TransformManyBlocks and one ActionBlock.

var directoryBlock = new TransformManyBlock<DirectoryConfig, string>(config =>
{
    return Directory.GetFiles(config.DirectoryPath, config.Token);
});

var fileBlock = new TransformManyBlock<string, string>(filePath =>
{
    return File.ReadLines(filePath);
});

var lineBlock = new ActionBlock<string>(async line =>
{
    await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 4
});

directoryBlock.LinkTo(fileBlock, new DataflowLinkOptions { PropagateCompletion = true });
fileBlock.LinkTo(lineBlock, new DataflowLinkOptions { PropagateCompletion = true });

foreach (DirectoryConfig config in GetDirectoryConfigs())
    await directoryBlock.SendAsync(config);

directoryBlock.Complete();
await lineBlock.Completion;

This example is not very good since all the work is done by the last block (the lineBlock), and the first two blocks are doing essentially nothing. It is also not memory-efficient since all lines of all files of all directories will soon become queued in the input buffer of the ActionBlock, unless processing the lines happens to be faster than reading them from the disk. You’ll need to configure the blocks with BoundedCapacity to solve this problem.

This example also fails to demonstrate how you could have different blocks for different types of files, and link the directoryBlock to all of them using a different filtering predicate for each link:

directoryBlock.LinkTo(csvBlock, filePath => Path.GetExtension(filePath) == "csv");
directoryBlock.LinkTo(xlsBlock, filePath => Path.GetExtension(filePath) == "xls");
directoryBlock.LinkTo(generalFileBlock); // Anything that is neither csv nor xls

There are also other types of blocks you could use, like the TransformBlock and the BatchBlock. The TPL Dataflow is based on the Task Parallel Library (TPL), and it is essentially a high level task-generator that creates and controls the lifecycle of the tasks needed in order to process a workload of given type, based on declarative configuration. It is built-in the .NET Core, and available as a package for .NET Framework.

Leave a Comment