Partition: How to add a wait after every partition

Here is a RateLimiter class that you could use in order to limit the frequency of the asynchronous operations. It is a simpler implementation of the RateLimiter class that is found in this answer.

/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private void ScheduleSemaphoreRelease()
    {
        ThreadPool.QueueUserWorkItem(async _ =>
        {
            try { await Task.Delay(_timeUnit).ConfigureAwait(false); }
            finally { _semaphore.Release(); }
        });
    }
}

Usage example:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

Online demo.

Note: This implementation is leaky in the sense that it initiates internally asynchronous Task.Delay operations, that cannot be canceled when you are finished using the RateLimiter. Any pending asynchronous operations will prevent the RateLimiter from being garbage collected in a timely manner, on top of consuming resources associated with active Task.Delay tasks. Also the SemaphoreSlim is not disposed as it should. These are minor flaws, that might not affect a program that creates only a handful of RateLimiters. In case you intend to create a lot of them, you could take a look at the 3rd revision of this answer, that features a disposable RateLimiter.


Here is an alternative implementation of the RateLimiter class, more complex, which is based on the Environment.TickCount64 property instead of a SemaphoreSlim. It has the advantage that it doesn’t launch hidden asynchronous operations in the background. The disadvantages are that the WaitAsync method does not support a CancellationToken argument, and that the probability of bugs is higher because of the complexity.

public class RateLimiter
{
    private readonly Queue<long> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly int _timeUnitMilliseconds;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _queue = new Queue<long>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
    }

    public Task WaitAsync()
    {
        int delayMilliseconds = 0;
        lock (_queue)
        {
            long currentTimestamp = Environment.TickCount64;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                long refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
                Debug.Assert(delayMilliseconds >= 0);
                if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delayMilliseconds
                + _timeUnitMilliseconds);
        }
        if (delayMilliseconds == 0) return Task.CompletedTask;
        return Task.Delay(delayMilliseconds);
    }
}

Leave a Comment