A way to push buffered events in even intervals

It’s actually tricker than it sounds.

Using Delay doesn’t work because the values will still happen in bulk, only slightly delayed.

Using Interval with either CombineLatest or Zip doesn’t work, since the former will cause source values to be skipped and the latter will buffer interval values.

I think the new Drain operator (added in 1.0.2787.0), combined with Delay should do the trick:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

The Drain operator works like SelectMany, but waits until the previous output completes before calling the selector with the next value. It’s still not exactly what you are after (the first value in a block will also be delayed), but it’s close: The usage above matches your marble diagram now.

Edit: Apparently the Drain in the framework doesn’t work like SelectMany. I’ll ask for some advice in the official forums. In the meantime, here’s an implementation of Drain that does what you’re after:

Edit 09/11: Fixed errors in implementation and updated usage to match your requested marble diagram.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Leave a Comment