Batching on duration or threshold using TPL Dataflow

Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires. Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods. … Read more

How to throttle multiple asynchronous tasks?

First, abstract away from threads. Especially since your operation is asynchronous, you shouldn’t be thinking about “threads” at all. In the asynchronous world, you have tasks, and you can have a huge number of tasks compared to threads. Throttling asynchronous code can be done using SemaphoreSlim: static async Task DoSomething(int n); static void RunConcurrently(int total, … Read more

.Net TPL: Limited Concurrency Level Task scheduler with task priority?

The Parallel Extensions Extras Samples. already provide such a scheduler, the QueuedTaskScheduler. This scheduler provides priorities, concurrency limits, fairness and fine-grained control over the type and priorities of the threads used. Of course, you don’t have to use or configure the features you don’t need. Stephen Toub provides a brief description of the various schedulers … Read more

Faulted vs Canceled task status after CancellationToken.ThrowIfCancellationRequested

There are two problems here. First, it’s always a good idea to pass CancellationToken to the Task.Run API, besides making it available to the task’s lambda. Doing so associates the token with the task and is vital for the correct propagation of the cancellation triggered by token.ThrowIfCancellationRequested. This however doesn’t explain why the cancellation status … Read more

Correct way to delay the start of a Task

Like Damien_The_Unbeliever mentioned, the Async CTP includes Task.Delay. Fortunately, we have Reflector: public static class TaskEx { static readonly Task _sPreCompletedTask = GetCompletedTask(); static readonly Task _sPreCanceledTask = GetPreCanceledTask(); public static Task Delay(int dueTimeMs, CancellationToken cancellationToken) { if (dueTimeMs < -1) throw new ArgumentOutOfRangeException(“dueTimeMs”, “Invalid due time”); if (cancellationToken.IsCancellationRequested) return _sPreCanceledTask; if (dueTimeMs == 0) … Read more

Should i use ThreadPools or Task Parallel Library for IO-bound operations

So i instead decided to write tests for this and see it on practical data. Test Legend Itr: Iteration Seq: Sequential Approach. PrlEx: Parallel Extensions – Parallel.ForEach TPL: Task Parallel Library TPool: ThreadPool Test Results Single-Core CPU [Win7-32] — runs under VMWare — Test Environment: 1 physical cpus, 1 cores, 1 logical cpus. Will be … Read more

Why CancellationToken is separate from CancellationTokenSource?

I was involved in the design and implementation of these classes. The short answer is “separation of concerns“. It is quite true that there are various implementation strategies and that some are simpler at least regarding the type system and initial learning. However, CTS and CT are intended for use in a great many scenarios … Read more

Converting async/await to Task.ContinueWith

I’ve figured out how to do it without async/await or TaskCompletionSource, using nested tasks and Task.Unwrap instead. First, to address @mikez’s comment, here’s GetResponseAsync implementation for .NET 4.0: static public Task<WebResponse> GetResponseTapAsync(this WebRequest request) { return Task.Factory.FromAsync( (asyncCallback, state) => request.BeginGetResponse(asyncCallback, state), (asyncResult) => request.EndGetResponse(asyncResult), null); } Now, here’s GetResponseWithRetryAsync implementation: static Task<HttpWebResponse> GetResponseWithRetryAsync(string url, … Read more

TPL Dataflow exception in transform block with bounded capacity

This is expected behavior. If there’s a fault “downstream”, the error does not propagate “backwards” up the mesh. The mesh is expecting you to detect that fault (e.g., via process_block.Completion) and resolve it. If you want to propagate errors backwards, you could have an await or continuation on process_block.Completion that faults the upstream block(s) if … Read more