Write an Rx “RetryAfter” extension method

The key to this implementation of a back off retry is deferred observables. A deferred observable won’t execute its factory until someone subscribes to it. And it will invoke the factory for each subscription, making it ideal for our retry scenario.

Assume we have a method which triggers a network request.

public IObservable<WebResponse> SomeApiMethod() { ... }

For the purposes of this little snippet, let’s define the deferred as source

var source = Observable.Defer(() => SomeApiMethod());

Whenever someone subscribes to source it will invoke SomeApiMethod and launch a new web request. The naive way to retry it whenever it fails would be using the built in Retry operator.

source.Retry(4)

That wouldn’t be very nice to the API though and it’s not what you’re asking for. We need to delay the launching of requests in between each attempt. One way of doing that is with a delayed subscription.

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

That’s not ideal since it’ll add the delay even on the first request, let’s fix that.

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

Just pausing for a second isn’t a very good retry method though so let’s change that constant to be a function which receives the retry count and returns an appropriate delay. Exponential back off is easy enough to implement.

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

We’re almost done now, we just need to add a way of specifying for which exceptions we should retry. Let’s add a function that given an exception returns whether or not it makes sense to retry, we’ll call it retryOnError.

Now we need to write some scary looking code but bear with me.

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

All of those angle brackets are there to marshal an exception for which we shouldn’t retry past the .Retry(). We’ve made the inner observable be an IObservable<Tuple<bool, WebResponse, Exception>> where the first bool indicates if we have a response or an exception. If retryOnError indicates that we should retry for a particular exception the inner observable will throw and that will be picked up by the retry. The SelectMany just unwraps our Tuple and makes the resulting observable be IObservable<WebRequest> again.

See my gist with full source and tests for the final version. Having this operator allows us to write our retry code quite succinctly

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )

Leave a Comment