For a TPL Dataflow: How do I get my hands on all the output produced by a TransformBlock while blocking until all inputs have been processed?

The cleanest way to retrieve the output of a TransformBlock is to perform a nested loop using the methods OutputAvailableAsync and TryReceive. It is a bit verbose, so you could consider encapsulating this functionality in an extension method ToListAsync:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    List<T> list = new();
    while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (source.TryReceive(out T item))
        {
            list.Add(item);
        }
    }
    Debug.Assert(source.Completion.IsCompleted);
    await source.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

Then you could use the ToListAsync method like this:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
    TransformBlock<string, DataTable> transformBlock = new(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    {
        // Do something with each dataTable
    }
}

Note: this ToListAsync implementation is destructive, meaning that in case of an error the consumed messages are discarded. To make it non-destructive, just remove the await source.Completion line. In this case you’ll have to remember to await the Completion of the block after processing the list with the consumed messages, otherwise you won’t be aware if the TransformBlock failed to process all of its input.

Alternative ways to retrieve the output of a dataflow block do exist, for example this one by dcastro uses a BufferBlock as a buffer and is slightly more performant, but personally I find the approach above to be safer and more straightforward.

Instead of waiting for the completion of the block before retrieving the output, you could also retrieve it in a streaming manner, as an IAsyncEnumerable<T> sequence:

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> source,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (source.TryReceive(out T item))
        {
            yield return item;
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    Debug.Assert(source.Completion.IsCompleted);
    await source.Completion.ConfigureAwait(false); // Propagate possible exception
}

This way you will be able to get your hands to each DataTable immediately after it has been cooked, without having to wait for the processing of all queries. To consume an IAsyncEnumerable<T> you simply move the await before the foreach:

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
    // Do something with each dataTable
}

Advanced: Below is a more sophisticated version of the ToListAsync method, that propagates all the errors of the underlying block, in the same direct way that are propagated by methods like the Task.WhenAll and Parallel.ForEachAsync. The original simple ToListAsync method wraps the errors in a nested AggregateException, using the Wait technique that is shown in this answer.

/// <summary>
/// Asynchronously waits for the successful completion of the specified source, and
/// returns all the received messages. In case the source completes with error,
/// the error is propagated and the received messages are discarded.
/// </summary>
public static Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
    CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);

    async Task<List<T>> Implementation()
    {
        List<T> list = new();
        while (await source.OutputAvailableAsync(cancellationToken)
            .ConfigureAwait(false))
            while (source.TryReceive(out T item))
                list.Add(item);
        await source.Completion.ConfigureAwait(false);
        return list;
    }

    return Implementation().ContinueWith(t =>
    {
        if (t.IsCanceled) return t;
        Debug.Assert(source.Completion.IsCompleted);
        if (source.Completion.IsFaulted)
        {
            TaskCompletionSource<List<T>> tcs = new();
            tcs.SetException(source.Completion.Exception.InnerExceptions);
            return tcs.Task;
        }
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

.NET 6 update: A new API DataflowBlock.ReceiveAllAsync was introduced in .NET 6, with this signature:

public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
    this IReceivableSourceBlock<TOutput> source,
    CancellationToken cancellationToken = default);

It is similar with the aforementioned ToAsyncEnumerable method. The important difference is that the new API does not propagate the possible exception of the consumed source block, after propagating all of its messages. This behavior is not consistent with the analogous API ReadAllAsync from the Channels library. I have reported this consistency on GitHub, and the issue is currently labeled by Microsoft as a bug.

Leave a Comment