Now that the Parallel.ForEachAsync
API has become part of the standard libraries (.NET 6), it makes sense to implement a variant that returns a Task<TResult[]>
, based on this API. Here is an implementation:
/// <summary>
/// Executes a foreach loop on an enumerable sequence, in which iterations may run
/// in parallel, and returns the results of all iterations in the original order.
/// </summary>
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
IEnumerable<TSource> source,
ParallelOptions parallelOptions,
Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(parallelOptions);
ArgumentNullException.ThrowIfNull(body);
List<TResult> results = new();
if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
IEnumerable<(TSource, int)> withIndexes = source.Select((x, i) => (x, i));
return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
{
(TSource item, int index) = entry;
TResult result = await body(item, ct).ConfigureAwait(false);
lock (results)
{
while (results.Count <= index) results.Add(default);
results[index] = result;
}
}).ContinueWith(t =>
{
TaskCompletionSource<TResult[]> tcs = new();
switch (t.Status)
{
case TaskStatus.RanToCompletion:
lock (results) tcs.SetResult(results.ToArray()); break;
case TaskStatus.Faulted:
tcs.SetException(t.Exception.InnerExceptions); break;
case TaskStatus.Canceled:
tcs.SetCanceled(new TaskCanceledException(t).CancellationToken); break;
default: throw new UnreachableException();
}
Debug.Assert(tcs.Task.IsCompleted);
return tcs.Task;
}, default, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
This implementation supports all the options and the functionality of the Parallel.ForEachAsync
overload that has an IEnumerable<T>
as source
. Its behavior in case of errors and cancellation is identical. The results are arranged in the same order as the associated elements in the source
sequence.