I was able to reproduce the issue you are observing. It is clearly a flaw in the Channels library IMHO. Here is my repro:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static class Program
{
public static async Task Main()
{
var channel = Channel.CreateUnbounded<int>();
var bufferBlock = new BufferBlock<int>();
var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
var mem0 = GC.GetTotalMemory(true);
int timeouts = 0;
for (int i = 0; i < 10; i++)
{
var stopwatch = Stopwatch.StartNew();
while (stopwatch.ElapsedMilliseconds < 500)
{
using var cts = new CancellationTokenSource(1);
try
{
await channel.Reader.ReadAsync(cts.Token);
//await bufferBlock.ReceiveAsync(cts.Token);
//await asyncCollection.TakeAsync(cts.Token);
}
catch (OperationCanceledException) { timeouts++; }
}
var mem1 = GC.GetTotalMemory(true);
Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
+ $" Allocated: {mem1 - mem0:#,0} bytes");
}
}
}
Output:
1) Timeouts: 124, Allocated: 175,664 bytes
2) Timeouts: 250, Allocated: 269,720 bytes
3) Timeouts: 376, Allocated: 362,544 bytes
4) Timeouts: 502, Allocated: 453,264 bytes
5) Timeouts: 628, Allocated: 548,080 bytes
6) Timeouts: 754, Allocated: 638,800 bytes
7) Timeouts: 880, Allocated: 729,584 bytes
8) Timeouts: 1,006, Allocated: 820,304 bytes
9) Timeouts: 1,132, Allocated: 919,216 bytes
10) Timeouts: 1,258, Allocated: 1,011,928 bytes
Around 800 bytes are leaked per operation, which is quite nasty. The memory is reclaimed every time a new value is written in the channel, so for a busy channel this design flaw should not be an issue. But for a channel that receives values sporadically, this can be a showstopper.
There are other asynchronous queue implementations available, that do not suffer from the same issue. You can try commenting the await channel.Reader.ReadAsync(cts.Token);
line and uncommenting any of the two lines below. You will see that both the BufferBlock<T>
from the TPL Dataflow library, and the AsyncCollection<T>
from the Nito.AsyncEx.Coordination package, allow asynchronous retrieval from the queue with timeout, without memory leakage.