It’s actually tricker than it sounds.
Using Delay
doesn’t work because the values will still happen in bulk, only slightly delayed.
Using Interval
with either CombineLatest
or Zip
doesn’t work, since the former will cause source values to be skipped and the latter will buffer interval values.
I think the new Drain
operator (added in 1.0.2787.0), combined with Delay
should do the trick:
source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));
The Drain
operator works like SelectMany
, but waits until the previous output completes before calling the selector with the next value. It’s still not exactly what you are after (the first value in a block will also be delayed), but it’s close: The usage above matches your marble diagram now.
Edit: Apparently the Drain
in the framework doesn’t work like SelectMany
. I’ll ask for some advice in the official forums. In the meantime, here’s an implementation of Drain that does what you’re after:
Edit 09/11: Fixed errors in implementation and updated usage to match your requested marble diagram.
public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}