How to call back async function from Rx Subscribe?

Ana Betts’ answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this: Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe(); Or: Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe();

What’s a good way to run periodic tasks using Rx, with a single concurrent execution restriction?

You are on the right track, you can use Select + Concat to flatten out the observable and limit the number of inflight requests (Note: if your task takes longer than the interval time, then they will start to stack up since they can’t execute fast enough): var source = Observable.Interval(TimeSpan.FromMilliseconds(100)) //I assume you are … Read more

How can I implement an exhaustMap handler in Rx.Net?

Here is an implementation of the ExhaustMap operator. The source observable is projected to an IObservable<Task<TResult>>, where each subsequent task is either the previous one if it’s still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged operator, and finally the … Read more