Read and process files in parallel C#

It looks like your application’s performance is mostly limited by IO. However, you still have a bit of CPU-bound work in your code. These two bits of work are interdependent: your CPU-bound work cannot start until the IO has done its job, and the IO does not move on to the next work item until your CPU has finished with the previous one. They’re both holding each other up. Therefore, it is possible (explained at the very bottom) that you will see an improvement in throughput if you perform your IO- and CPU-bound work in parallel, like so:

void ReadAndProcessFiles(string[] filePaths)
{
    // Our thread-safe collection used for the handover.
    var lines = new BlockingCollection<string>();

    // Build the pipeline.
    var stage1 = Task.Run(() =>
    {
        try
        {
            foreach (var filePath in filePaths)
            {
                using (var reader = new StreamReader(filePath))
                {
                    string line;

                    while ((line = reader.ReadLine()) != null)
                    {
                        // Hand over to stage 2 and continue reading.
                        lines.Add(line);
                    }
                }
            }
        }
        finally
        {
            lines.CompleteAdding();
        }
    });

    var stage2 = Task.Run(() =>
    {
        // Process lines on a ThreadPool thread
        // as soon as they become available.
        foreach (var line in lines.GetConsumingEnumerable())
        {
            String pattern = "\\s{4,}";

            foreach (String trace in Regex.Split(line, pattern))
            {
                if (trace != String.Empty)
                {
                    String[] details = Regex.Split(trace, "\\s+");

                    Instruction instruction = new Instruction(details[0],
                        int.Parse(details[1]),
                        int.Parse(details[2]));
                    Console.WriteLine("computing...");
                    instructions.Add(instruction);
                }
            }
        }
    });

    // Block until both tasks have completed.
    // This makes this method prone to deadlocking.
    // Consider using 'await Task.WhenAll' instead.
    Task.WaitAll(stage1, stage2);
}

I highly doubt that it’s your CPU work holding things up, but if it happens to be the case, you can also parallelise stage 2 like so:

    var stage2 = Task.Run(() =>
    {
        var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };

        Parallel.ForEach(lines.GetConsumingEnumerable(), parallelOptions, line =>
        {
            String pattern = "\\s{4,}";

            foreach (String trace in Regex.Split(line, pattern))
            {
                if (trace != String.Empty)
                {
                    String[] details = Regex.Split(trace, "\\s+");

                    Instruction instruction = new Instruction(details[0],
                        int.Parse(details[1]),
                        int.Parse(details[2]));
                    Console.WriteLine("computing...");
                    instructions.Add(instruction);
                }
            }
        });
    });

Mind you, if your CPU work component is negligible in comparison to the IO component, you won’t see much speed-up. The more even the workload is, the better the pipeline is going to perform in comparison with sequential processing.

Since we’re talking about performance note that I am not particularly thrilled about the number of blocking calls in the above code. If I were doing this in my own project, I would have gone the async/await route. I chose not to do so in this case because I wanted to keep things easy to understand and easy to integrate.

Leave a Comment