This is possible by combining the built-in Window
and Throttle
methods of Observable
. First, let’s solve the simpler problem where we ignore the maximum count condition:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
The powerful Window
method did the heavy lifting. Now it’s easy enough to see how to add a maximum count:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
var closes = stream.Throttle(delay);
if (max != null)
{
var overflows = stream.Where((x,index) => index+1>=max);
closes = closes.Merge(overflows);
}
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
I’ll write a post explaining this on my blog. https://gist.github.com/2244036
Documentation for the Window method: