Channels with CancellationTokenSource with timeout memory leak after dispose

I was able to reproduce the issue you are observing. It is clearly a flaw in the Channels library IMHO. Here is my repro:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public static class Program
{
    public static async Task Main()
    {
        var channel = Channel.CreateUnbounded<int>();
        var bufferBlock = new BufferBlock<int>();
        var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
        var mem0 = GC.GetTotalMemory(true);
        int timeouts = 0;
        for (int i = 0; i < 10; i++)
        {
            var stopwatch = Stopwatch.StartNew();
            while (stopwatch.ElapsedMilliseconds < 500)
            {
                using var cts = new CancellationTokenSource(1);
                try
                {
                    await channel.Reader.ReadAsync(cts.Token);
                    //await bufferBlock.ReceiveAsync(cts.Token);
                    //await asyncCollection.TakeAsync(cts.Token);
                }
                catch (OperationCanceledException) { timeouts++; }
            }
            var mem1 = GC.GetTotalMemory(true);
            Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
                + $" Allocated: {mem1 - mem0:#,0} bytes");
        }
    }
}

Output:

 1) Timeouts:   124, Allocated: 175,664 bytes
 2) Timeouts:   250, Allocated: 269,720 bytes
 3) Timeouts:   376, Allocated: 362,544 bytes
 4) Timeouts:   502, Allocated: 453,264 bytes
 5) Timeouts:   628, Allocated: 548,080 bytes
 6) Timeouts:   754, Allocated: 638,800 bytes
 7) Timeouts:   880, Allocated: 729,584 bytes
 8) Timeouts: 1,006, Allocated: 820,304 bytes
 9) Timeouts: 1,132, Allocated: 919,216 bytes
10) Timeouts: 1,258, Allocated: 1,011,928 bytes

Try it on Fiddle.

Around 800 bytes are leaked per operation, which is quite nasty. The memory is reclaimed every time a new value is written in the channel, so for a busy channel this design flaw should not be an issue. But for a channel that receives values sporadically, this can be a showstopper.

There are other asynchronous queue implementations available, that do not suffer from the same issue. You can try commenting the await channel.Reader.ReadAsync(cts.Token); line and uncommenting any of the two lines below. You will see that both the BufferBlock<T> from the TPL Dataflow library, and the AsyncCollection<T> from the Nito.AsyncEx.Coordination package, allow asynchronous retrieval from the queue with timeout, without memory leakage.

Leave a Comment