How to use Observable.FromEvent instead of FromEventPattern and avoid string literal event names

Summary

The first point to make is that you don’t actually need to use Observable.FromEvent to avoid the string literal reference. This version of FromEventPattern will work:

var groupedKeyPresses =
    Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
        h => KeyPress += h,
        h => KeyPress -= h)
        .Select(k => k.EventArgs.KeyChar)
        .GroupBy(k => k);

If you do want to make FromEvent work, you can do it like this:

var groupedKeyPresses =
    Observable.FromEvent<KeyPressEventHandler, KeyPressEventArgs>(
        handler =>
        {
            KeyPressEventHandler kpeHandler = (sender, e) => handler(e);
            return kpeHandler;
        }, 
        h => KeyPress += h,
        h => KeyPress -= h)
        .Select(k => k.KeyChar)
        .GroupBy(k => k);

Why? It is because the FromEvent operator exists to work with any event delegate type.

The first parameter here is a conversion function that connects the event to the Rx subscriber. It accepts the OnNext handler of an observer (an Action<T>) and returns a handler compatible with the underlying event delegate that will invoke that OnNext handler. This generated handler can then be subscribed to the event.

I never liked the offical MSDN documentation for this function, so here is an expanded explanation that walks through the usage of this function piece by piece.

The Lowdown on Observable.FromEvent

The following breaks down why FromEvent exists and how it works:

Review of how .NET event subscriptions work

Consider how .NET events work. These are implemented as delegate chains. Standard event delegates follow the pattern of delegate void FooHandler(object sender, EventArgs eventArgs), but in actuality events can work with any delegate type (even those with a return type!). We subscribe to an event by passing in an appropriate delegate to a special function that adds it to a delegate chain (typically via the += operator), or if no handlers are subscribed yet, the delegate becomes the root of the chain. This is why we must do a null check when raising an event.

When the event is raised, (typically) the delegate chain is invoked so that each delegate in the chain is called in turn. To unsubscribe from a .NET event, the delegate is passed into a special function (typically via the -= operator) so that it can be removed from the delegate chain (the chain is walked until a matching reference is found, and that link is removed from the chain).

Let’s create a simple but non-standard .NET event implementation. Here I am using the less common add/remove syntax to expose the underlying delegate chain and enable us to log subscription and unsubscription. Our non-standard event features a delegate with parameters of an integer and a string rather than the usual object sender and EventArgs subclass:

public delegate void BarHandler(int x, string y);

public class Foo
{  
    private BarHandler delegateChain;

    public event BarHandler BarEvent
    {
        add
        {
            delegateChain += value;                
            Console.WriteLine("Event handler added");
        }
        remove
        {
            delegateChain -= value;
            Console.WriteLine("Event handler removed");
        }
    }

    public void RaiseBar(int x, string y)
    {
        var temp = delegateChain;
        if(temp != null)
        {
            delegateChain(x, y);
        }
    }
}

Review of how Rx subscriptions work

Now consider how Observable streams work. A subscription to an observable is formed by calling the Subscribe method and passing an object that implements the IObserver<T> interface, which has the OnNext, OnCompleted and OnError methods called by the observable to handle events. Additionally the Subscribe method returns an IDisposable handle that can be disposed to unsubscribe.

More typically, we use convenience extension methods that overload Subscribe. These extensions accept delegate handlers conforming to the OnXXX signatures and transparently create an AnonymousObservable<T> whose OnXXX methods will invoke those handlers.

Bridging .NET and Rx events

So how can we create a bridge to extend .NET events into the Rx observable streams? The result of calling Observable.FromEvent is to create an IObservable whose Subscribe method acts like a factory that will create this bridge.

The .NET event pattern has no representation of completed or error events. Only of an event being raised. In other words, we must only bridge three aspects of the event that map to Rx as follows:

  1. Subscription e.g a call to IObservable<T>.Subscribe(SomeIObserver<T>) maps to fooInstance.BarEvent += barHandlerInstance.
  2. Invocation e.g. a call to barHandlerInstance(int x, string y) maps to SomeObserver.OnNext(T arg)
  3. Unsubscription e.g. assuming we preserve the returned IDisposable handler from our Subscribe call into a variable called subscription, then a call to subscription.Dispose() maps to fooInstance.BarEvent -= barHandlerInstance.

Note that it’s only that act of calling Subscribe that creates the subscription. So the Observable.FromEvent call is returning a factory supporting subscription to, invocation of, and unsubscription from the underlying event. At this point, there is no event subscription taking place. Only at the point of calling Subscribe will the Observer be available, along with it’s OnNext handler. Therefore, the FromEvent call must accept factory methods it can use to implement the three bridging actions at the appropriate time.

The FromEvent Type Arguments

So now let’s consider a correct implementation of FromEvent for the above event.

Recall that OnNext handlers only accept a single argument. .NET event handlers can have any number of parameters. So our first decision is to select a single type to represent event invocations in the target observable stream.

In fact, this can be any type you want to appear in your target observable stream. It’s the job of the conversion function (discussed shortly) to provide the logic to convert the event invocation into an OnNext invocation – and there’s plenty of freedom to decide how this happens.

Here we will map the int x, string y arguments of a BarEvent invocation into a formatted string describing both values. In other words we will cause a call to fooInstance.RaiseBar(1, "a") to result in an invocation of someObserver.OnNext("X:1 Y:a").

This example should put to rest a very common source of confusion: What do the type parameters of FromEvent represent? Here the first type BarHandler is the source .NET event delegate type, the second type is the target OnNext handler’s argument type. Because this second type is often an EventArgs subclass it’s often assumed that it must be some necessary part of the .NET event delegate – a lot of people miss the fact its relevance is really due to the OnNext handler. So the first part of our FromEvent call looks like this:

 var observableBar = Observable.FromEvent<BarHandler, string>(

The Conversion Function

Now let’s consider the first argument to FromEvent, the so called conversion function. (Note, some overloads of FromEvent omit the conversion function – more on this later.)

The lambda syntax can be truncated quite a bit thanks to type inference, so here’s a long-hand version to start with:

(Action<string> onNextHandler) =>
{
    BarHandler barHandler = (int x, string y) =>
    {
        onNextHandler("X:" + x + " Y:" + y);
    };
    return barHandler;
}

So this conversion function is a factory function that when invoked creates a handler compatible with the underlying .NET event. The factory function accepts an OnNext delegate. This delegate should be invoked by the returned handler in response to the handler function being invoked with the underlying .NET event arguments. The delegate will be invoked with the result of converting the .NET event arguments to an instance of the OnNext parameter type. So from the above example we can see that the factory function will be called with an onNextHandler of type Action<string> – it must be invoked with a string value in response to each .NET event invocation. The factory function creates a delegate handler of type BarHandler for the .NET event that handles event invocations by invoking the onNextHandler with a formatted string created from the arguments of the corresponding event invocation.

With a bit of type inference, we can collapse the above code to the following equivalent code:

onNextHandler => (int x, string y) => onNextHandler("X:" + x + " Y:" + y)

The conversion function therefore fulfils some of the event Subscription logic in providing a function to create an appropriate event handler, and it also does the work to map the .NET event invocation to the Rx OnNext handler invocation.

As mentioned previously, there are overloads of FromEvent that omit the conversion function. This is because it is not required if the event delegate is already compatible with the method signature required for OnNext.

The add/remove handlers

The remaining two arguments are the addHandler and removeHandler that are responsible for subscribing and unsubscribing the created delegate handler to the actual .NET event – Assuming we have an instance of Foo called foo then the completed FromEvent call looks like this:

var observableBar = Observable.FromEvent<BarHandler, string>(
    onNextHandler => (int x, string y) => onNextHandler("X:" + x + " Y:" + y),
    h => foo.BarEvent += h,
    h => foo.BarEvent -= h);

It’s up to us to decide how the event we are going to bridge is procured – so we provide the add and remove handler functions that expect to be provided the created conversion handler. The event is typically captured via a closure, as in the above example where we close over a foo instance.

Now we have all the pieces for the FromEvent observable to fully implement subscription, invocation and unsubscription.

Just one more thing…

There’s one final piece of glue to mention. Rx optimizes the subscriptions to the .NET event. In reality, for any given number of subscribers to the observable, just one single subscription is made to the underlying .NET event. This is then multicast to the Rx subscribers via the Publish mechanism. It’s as if a Publish().RefCount() had been appended to the observable.

Consider the following example using the delegate and class defined above:

public static void Main()
{
    var foo = new Foo();

    var observableBar = Observable.FromEvent<BarHandler, string>(
        onNextHandler => (int x, string y)
            => onNextHandler("X:" + x + " Y:" + y),
    h => foo.BarEvent += h,
    h => foo.BarEvent -= h);

    var xs = observableBar.Subscribe(x => Console.WriteLine("xs: " + x));
    foo.RaiseBar(1, "First");    
    var ys = observableBar.Subscribe(x => Console.WriteLine("ys: " + x));
    foo.RaiseBar(1, "Second");
    xs.Dispose();
    foo.RaiseBar(1, "Third");    
    ys.Dispose();
}

This produces this following output, demonstrating just a single subscription is made:

Event handler added
xs: X:1 Y:First
xs: X:1 Y:Second
ys: X:1 Y:Second
ys: X:1 Y:Third
Event handler removed

I do help this helps clears up any lingering confusion over how this complex function works!

Leave a Comment