Sunday 31 May 2009

Reacting to the Reactive Framework: Part 2

DISCLAIMER: The code in this post was the result of a big hackfest with the single goal of getting a particular use case working. I am telling you right now that I paid no attention to potential concurrency problems and that if you try to reuse any of this stuff you are just asking for trouble.

In the previous post, I explained how Eric Meijer’s recent Lang.NET presentation on the LiveLabs Reactive Framework piqued my interest. Wondering how the framework works, I decided to have a go at emulating some of the basic functionality it appears to provide. This code demonstrates the functionality I’ve implemented so far:

Func<MouseEventArgs, int> slowOperation = args =>
{
    System.Threading.Thread.Sleep(3000);
    return args.X;
};

IObservable<int> observable = from md in button1.GetMouseDowns()
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

Action<string> textboxUpdater = s => textBox1.AppendText(s);
observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));

Today I am going to work my way through the query comprehension you can see here and do my best to explain how I got this working. Perhaps a good place to start would be to convert the query comprehension into a method chain in the same way the C# compiler does for us under the covers. Resharper can do this automatically, which is convenient:

IObservable<int> observable = button1.GetMouseDowns().SelectMany(
                    md => slowOperation.AsAsyncObservable(md.EventArgs), 
                    (md, x) => x
                    );

The first call here is to an extension method called GetMouseDowns:

public static IObservable<EventResult<Button, MouseEventArgs>> GetMouseDowns(this Button b)
{
    var wrapper = new EventWrapper<Button, MouseEventArgs>();
    b.MouseDown += wrapper.Handle;
    return wrapper;
}

So far, this is relatively straightforward. The GetMouseDowns extension method creates an EventWrapper, and wires that EventWrapper up to the MouseDown event on the button. You can see that the wrapper is being returned as an IObservable of an EventResult. Here is the code for these:

public interface IObservable<T>
{
    void Attach(Action<T> action);
}
public class EventResult<TSender, TArgs>
{
    public EventResult(TSender sender, TArgs args)
    {
        Sender = sender;
        EventArgs = args;
    }

    public TSender Sender { get; private set; }
    public TArgs EventArgs { get; private set; }
}
public class EventWrapper<TSender, TArgs> : IObservable<EventResult<TSender,TArgs>>
{
    private List<Action<EventResult<TSender, TArgs>>> _attached = new List<Action<EventResult<TSender, TArgs>>>();

    public void Handle(object sender, TArgs e)
    {
        foreach (var action in _attached)
            action(new EventResult<TSender,TArgs>((TSender)sender, e ));
    }

    public void Attach(Action<EventResult<TSender, TArgs>> action)
    {
        _attached.Add(action);
    }
}

Now there is some really naive code here. What would happen if someone called Attach while we were in the middle of handling the event? It would crash, because the list of actions to be called would be modified while it was being iterated over. In general, the code for the EventWrapper feels bad. I am sure it can be done much better, but it works for now.

So the result of the GetMouseDowns method is an EventWrapper that will call the attached methods when the mouse down event occurs, and this is exposed as an IObservable. SelectMany is then called on that IObservable. This is another extension method:

public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
    return new SelectManyObservable<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}

So how on earth did I arrive at this method? Its quite simple actually – I stole its signature from the SelectMany extension method implemented on IEnumerable. The interesting thing about LINQ query comprehensions is that they actually don’t have a dependence on IEnumerable or IQueryable. Instead they depend on the existence of particular methods with the appropriate signature such as Select, SelectMany, Where, GroupBy, etc. By writing these extension methods for IObservable, I can then write query comprehensions against IObservables.

Once its established that my SelectMany borrowed its signature from the appropriate extension method on IEnumerable, there isn’t much more to know about it. You can see that all it simply does is return a SelectManyObservable that wraps the passed arguments. What is the SelectManyObservable exactly? Well, here is the code:

public class SelectManyObservable<TSource, TCollection, TResult> : IObservable<TResult>
{
    private readonly IObservable<TSource> _source;
    private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
    private readonly Func<TSource, TCollection, TResult> _resultSelector;

    public SelectManyObservable(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
    {
        _source = source;
        _collectionSelector = collectionSelector;
        _resultSelector = resultSelector;
    }

    public void Attach(Action<TResult> action)
    {
        _source.Attach(
            s => _collectionSelector(s).Attach(
                c => action(_resultSelector(s, c))
            )
        );            
    }
}

So this class simply wraps an underlying IObservable, and when methods are attached to the SelecyManyObservable, it actually gets attached to the underlying IObservable. The funny thing about this code is that it practically wrote itself. It was basically a matter of just figuring out how the underlying observable, collectionSelector, resultSelector and attached action related to each other and then calling them accordingly. It felt like solving a 4 piece jigsaw puzzle with only 1 solution.

Before the explanation of the query comprehension is complete, there is one more extension method that requires explanation:

public static IObservable<TResult> AsAsyncObservable<TInput, TResult>(this Func<TInput,TResult> funcToObserve,TInput input)
{
    return new AsyncWrapper<TInput, TResult>(funcToObserve, input);
}

This pattern should look familiar. Its like the SelectMany in that it simply returns a wrapper object that takes all the method arguments as parameters. The signature however, wasn’t borrowed from the framework. The point of this method is to allow the user to convert an anonymous delegate or lambda expression into an IObservable. Now I hope you packed the goggles I mentioned in my previous post, because AsyncWrapper is really nasty:

public class AsyncWrapper<TInput, TResult> : IObservable<TResult>
{
    private List<Action<TResult>> _attached = new List<Action<TResult>>();

    public AsyncWrapper(Func<TInput, TResult> funcToObserve, TInput input)
    {
        funcToObserve.BeginInvoke(input, CompletedCallback, null);
    }

    public void Attach(Action<TResult> action)
    {
        _attached.Add(action);
    }

    private void CompletedCallback(IAsyncResult asyncResult)
    {
        TResult calculatedValue = ((Func<TInput, TResult>) ((AsyncResult) asyncResult).AsyncDelegate).EndInvoke(
            asyncResult);

        foreach (var action in _attached)
            action(calculatedValue);
    }
}

It might not seem so bad, until you look at what the constructor is doing. When the AsyncWrapper is created, it fires off the function to observe. Now when that function completes, the callback will be invoked, and as a result all the attached actions will also be invoked. The reason why this is all so heinous is that the actions will have to be attached AFTER the function to observe is invoked, but before it completes. As a result, I am fairly sure that if we changed the slow asynchronous operation to be much faster, then this code would stop working. Surely a better solution can be found, and I do hope to attend to this later.

So that covers all the code that makes my example work. The nice thing about this approach is the composability. It is convenient to be able to work with enumerables using select, where, etc and the same is true for observables. I am really looking forward to getting my hands on the Reactive Framework code. In the mean time, I’ll continue to experiment. In the next post I’ll extend this example, starting with adding support for ‘where’ operations.

Tuesday 26 May 2009

Reacting to the Reactive Framework: Part 1

Last night I managed to get some code working and I was very excited, but it was far too late to start writing a blog post. Here is the code:

Func<MouseEventArgs, int> slowOperation = args =>
{
    System.Threading.Thread.Sleep(3000);
    return args.X;
};

IObservable<int> observable = from md in button1.GetMouseDowns()
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

Action<string> textboxUpdater = s => textBox1.AppendText(s);
observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));

This code is sitting inside a very simple windows form. The form has a button and a textbox. When the button is clicked, a message is written to the textbox about 3 seconds later, relaying the relative X coordinate of the mouse when the button was clicked. The 3 second delay represents some sort of slow asynchronous operation, currently simulated with a simple Thread.Sleep(). Because the slow operation is done on a different thread, we have to use Control.BeginInvoke() to get the textbox to update on the UI thread. GetMouseDowns() is an extension method on Button that helps set the whole thing up by returning an IObservable that is wired to the MouseDown event.

So at this point you might be wondering what this is all about. To get the full story, I encourage you to watch Eric Meijer’s video on the LiveLabs Reactive Framework (direct wmv link). In this very enjoyable presentation, Eric gives us a run down the Reactive Framework, which appears to be a set of libraries that unite various types of “reactive” operations under a common pair of interfaces, IObserver<T> and IObservable<T> which are then integrated into LINQ. I found the presentation to be quite a tease. Erik shows just enough to give you an idea of what the Reactive Framework is all about, but he skims over the implementation details quite lightly. I ended up watching the thing a couple of times, trying to wrap my head around the idea of LINQ query comprehensions (e.g. from x in y select x.ID) operating against non enumerable/queryable types. In particular, I started wondering whether I could implement something like the code from this slide:

image

As you can see, my code at the top is not too dissimilar from Erik’s code here. Erik’s example is a web page with dictionary suggest on a textbox. When the user enters a character into the textbox, the dictionary suggest runs asynchronously against the input entered so far and when a return value is received it is rendered as html.

There is certainly quite a lot regarding the Reactive Framework that I don’t understand. I’ve been unable to find any resources beyond Erik’s talk – certainly the code doesn’t seem available anywhere. So far I don’t even understand how IObserver fits in. Can YOU see anything that might implement IObserver in the above slide? But in any case, the idea is intriguing, and trying to figure out how to do this myself is teaching me quite a bit about LINQ. In the next post, I’ll do my best to explain how the implementation works. Don’t forget your goggles, this is some pretty hacky code coming up.