ForEachAsync with Result

Now that the Parallel.ForEachAsync API has become part of the standard libraries (.NET 6), it makes sense to implement a variant that returns a Task<TResult[]>, based on this API. Here is an implementation:

/// <summary>
/// Executes a foreach loop on an enumerable sequence, in which iterations may run
/// in parallel, and returns the results of all iterations in the original order.
/// </summary>
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    List<TResult> results = new();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    IEnumerable<(TSource, int)> withIndexes = source.Select((x, i) => (x, i));
    return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
    {
        (TSource item, int index) = entry;
        TResult result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            while (results.Count <= index) results.Add(default);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        TaskCompletionSource<TResult[]> tcs = new();
        switch (t.Status)
        {
            case TaskStatus.RanToCompletion:
                lock (results) tcs.SetResult(results.ToArray()); break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions); break;
            case TaskStatus.Canceled:
                tcs.SetCanceled(new TaskCanceledException(t).CancellationToken); break;
            default: throw new UnreachableException();
        }
        Debug.Assert(tcs.Task.IsCompleted);
        return tcs.Task;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

This implementation supports all the options and the functionality of the Parallel.ForEachAsync overload that has an IEnumerable<T> as source. Its behavior in case of errors and cancellation is identical. The results are arranged in the same order as the associated elements in the source sequence.

Leave a Comment