TPL Dataflow, guarantee completion only when ALL source data blocks completed

The issue is exactly what casperOne said in his answer. Once the first transform block completes, the processor block goes into “finishing mode”: it will process remaining items in its input queue, but it won’t accept any new items. There is a simpler fix than splitting your processor block in two though: don’t set PropagateCompletion, … Read more

TPL Dataflow, whats the functional difference between Post() and SendAsync()?

To see the difference, you need a situation where blocks will postpone their messages. In this case, Post will return false immediately, whereas SendAsync will return a Task that will be completed when the block decides what to do with the message. The Task will have a true result if the message is accepted, and … Read more

Where can I find a TPL dataflow version for 4.0?

I wrote Steve from the TPL dataflow team about this problem and he responded me with the following download link: http://download.microsoft.com/download/F/9/6/F967673D-58D6-4E3F-8CA9-11769A0A63B1/TPLDataflow.msi This is a CTP version, but the date matches the Nuget package with version number 4.0, so I think it’s the latest version that was compiled against .NET 4.0.

Batching on duration or threshold using TPL Dataflow

Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires. Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods. … Read more

TPL Dataflow exception in transform block with bounded capacity

This is expected behavior. If there’s a fault “downstream”, the error does not propagate “backwards” up the mesh. The mesh is expecting you to detect that fault (e.g., via process_block.Completion) and resolve it. If you want to propagate errors backwards, you could have an await or continuation on process_block.Completion that faults the upstream block(s) if … Read more

BroadcastBlock with guaranteed delivery in TPL Dataflow

It is fairly simple to build what you’re asking using ActionBlock and SendAsync(), something like: public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>( IEnumerable<ITargetBlock<T>> targets) { var targetsList = targets.ToList(); return new ActionBlock<T>( async item => { foreach (var target in targetsList) { await target.SendAsync(item); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); } This is the most … Read more

For a TPL Dataflow: How do I get my hands on all the output produced by a TransformBlock while blocking until all inputs have been processed?

The cleanest way to retrieve the output of a TransformBlock is to perform a nested loop using the methods OutputAvailableAsync and TryReceive. It is a bit verbose, so you could consider encapsulating this functionality in an extension method ToListAsync: public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(source); List<T> list = … Read more