Do we need to unsubscribe from observable that completes/errors-out?

The Subscribing and Unsubscribing section of the Observable Contract is definitive regarding your question. It states: When an Observable issues an OnError or OnComplete notification to its observers, this ends the subscription. Observers do not need to issue an Unsubscribe notification to end subscriptions that are ended by the Observable in this way. This is … Read more

Pause and Resume Subscription on cold IObservable

Here’s a reasonably simple Rx way to do what you want. I’ve created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable. public static IObservable<T> Pausable<T>( this IObservable<T> source, IObservable<bool> pauser) { return Observable.Create<T>(o => { var paused = new SerialDisposable(); var … Read more

Reactive Extensions for .NET (Rx): Take action once all events are completed

You can use Observable.When like so (I’ve added an additional CheckBox for this example): var checkChanged = Observable.FromEvent<EventArgs>(this.checkBox, “CheckedChanged”); var check1Changed = Observable.FromEvent<EventArgs>(this.checkBox1, “CheckedChanged”); var keyPress = Observable.FromEvent<KeyPressEventArgs>(this.textBox, “KeyPress”); var plan1 = checkChanged .And(check1Changed).And(keyPress) .Then((cc, cc1, kp) => “Done.”); var when = Observable.When(plan1); when.Subscribe((result) => this.resultTextBox.Text = result); Also, if you can join other plans … Read more

How to call back async function from Rx Subscribe?

Ana Betts’ answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this: Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe(); Or: Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe();

RxJava and parallel execution of observer code

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing. A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states … Read more

What’s a good way to run periodic tasks using Rx, with a single concurrent execution restriction?

You are on the right track, you can use Select + Concat to flatten out the observable and limit the number of inflight requests (Note: if your task takes longer than the interval time, then they will start to stack up since they can’t execute fast enough): var source = Observable.Interval(TimeSpan.FromMilliseconds(100)) //I assume you are … Read more

How to use Observable.FromEvent instead of FromEventPattern and avoid string literal event names

Summary The first point to make is that you don’t actually need to use Observable.FromEvent to avoid the string literal reference. This version of FromEventPattern will work: var groupedKeyPresses = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>( h => KeyPress += h, h => KeyPress -= h) .Select(k => k.EventArgs.KeyChar) .GroupBy(k => k); If you do want to make FromEvent … Read more

What are the default Schedulers for each observable operator?

Wow, that was not trivial to find… Deep within the bowels of the System.Reactive.Concurrency namespace, there is an internal static class called SchedulerDefaults, which is declared as: internal static class SchedulerDefaults { internal static IScheduler AsyncConversions { get { return DefaultScheduler.Instance; }} internal static IScheduler ConstantTimeOperations { get { return ImmediateScheduler.Instance; }} internal static IScheduler … Read more