Does reactive extensions support rolling buffers?

This is possible by combining the built-in Window and Throttle methods of Observable. First, let’s solve the simpler problem where we ignore the maximum count condition: public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay) { var closes = stream.Throttle(delay); return stream.Window(() => closes).SelectMany(window => window.ToList()); } The powerful Window method did the heavy lifting. Now … Read more

With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running

Here is a method that is similar to Dave’s but uses Sample instead (which is more appropriate than buffer). I’ve included a similar extension method to the one I added to Dave’s answer. The extension: public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) { var sampler = new Subject<Unit>(); var sub = source. Sample(sampler). ObserveOn(Scheduler.ThreadPool). … Read more

Write an Rx “RetryAfter” extension method

The key to this implementation of a back off retry is deferred observables. A deferred observable won’t execute its factory until someone subscribes to it. And it will invoke the factory for each subscription, making it ideal for our retry scenario. Assume we have a method which triggers a network request. public IObservable<WebResponse> SomeApiMethod() { … Read more

How can I see what my reactive extensions query is doing?

You can append this function liberally to your Rx operators while you are developing them to see what’s happening: public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null) { opName = opName ?? “IObservable”; Console.WriteLine(“{0}: Observable obtained on Thread: {1}”, opName, Thread.CurrentThread.ManagedThreadId); return Observable.Create<T>(obs => { Console.WriteLine(“{0}: Subscribed to on Thread: {1}”, opName, Thread.CurrentThread.ManagedThreadId); … Read more

What are the Hot and Cold observables?

From: Anton Moiseev’s Book “Angular Development with Typescript, Second Edition.” : Hot and cold observables There are two types of observables: hot and cold. The main difference is that a cold observable creates a data producer for each subscriber, whereas a hot observable creates a data producer first, and each subscriber gets the data from … Read more

How can I implement an exhaustMap handler in Rx.Net?

Here is an implementation of the ExhaustMap operator. The source observable is projected to an IObservable<Task<TResult>>, where each subsequent task is either the previous one if it’s still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged operator, and finally the … Read more