Tuesday, 11 August 2009

Reacting to the Reactive Framework: Part 7

The demo application for today allows the user to experiment with a couple of the mechanisms that I tried for part 6. As you may recall, I wanted an IObservable<int[]> that would return the three selections made from three groups of radio buttons, and only begin returning values once a selection had been made from all three. The demo app allows you to switch between using SelectMany, ForkJoin and CombineLatest and observe how the behavior changes:

The first approach I tried was SelectMany - I’ll use the query comprehension syntax as it is much more readable than calling the SelectMany extension method directly:

from s1 in choiceControl1.OptionSelections
from s2 in choiceControl2.OptionSelections
from s3 in choiceControl3.OptionSelections
select new[] { s1, s2, s3 }

This syntax might be readable, but its also a little misleading. It looks very uniform, like it wouldn’t matter which order you wrote those three ‘from’ statements in. But if you are familiar with LINQ statements like this one, you will know that the ordering IS important. To observe this, activate the SelectMany option in the app above, and then make your selections in reverse order i.e. select an option from group 3, then group 2, then group 1. Notice how you don’t get a selection readout? What’s actually happening here is that the selections made in groups 2 and 3 are ignored until a selection is made in group 1. Then the selections made in group 3 are ignored, until a selection is made in group 2. Then finally selections from group 3 will trigger the observable and raise a result. I could go into more detail on what’s happening here, but is there much point? Its obvious that this implementation is not at all close to the desired result. Lets move on.

When I came across the signature for ForkJoin, I thought I had found what I was looking for:

public static IObservable<TSource[]> ForkJoin<TSource>(params IObservable<TSource>[] sources); 

It converts many IObservable<T>’s into one IObservable<T[]>, which is exactly what I want. The bad news is that it only works once. Go ahead and try ForkJoin in the sample app – the status text will update once a selection has been made from all three groups, but it will not update again as the selection continues to change. It did occur to me that perhaps I could use ForkJoin and somehow re-subscribe each time it fires, but I was reluctant to go down that path because it doesn’t feel like a functional (as in functional programming) solution.

The demo app for today uses CombineLatest in the same way as in my previous post:

choiceControl1.OptionSelections
  .CombineLatest(choiceControl2.OptionSelections, (i, j) => new[] { i, j })
  .CombineLatest(choiceControl3.OptionSelections, (array, k) => new[] { array[0], array[1], k }))

This works fine, though its not pretty. I’ll keep my eye out for a better solution, but before I finish up today I want to look at the code behind today’s demo app:

<StackPanel Margin="20">
    <TextBlock Text="Combining Example" HorizontalAlignment="Center" Margin="10" />
    <StackPanel Orientation="Vertical" HorizontalAlignment="Left">
        <TextBlock Text="Select a means of combination:" />
        <RadioButton Name="rbSelectMany" Content="Use SelectMany" />
        <RadioButton Name="rbForkJoin" Content="Use ForkJoin" />
        <RadioButton Name="rbCombineLatest" Content="Use CombineLatest" />                               
        <TextBlock Text="Select an option from each group below, and then experiment with changing your selections." TextWrapping="Wrap" Margin="0,10,0,0"  />
    </StackPanel>            
    <StackPanel Orientation="Horizontal" HorizontalAlignment="Center" >
        <SilverlightApp:ChoiceControl Name="choiceControl1" Heading="Group 1"/>
        <SilverlightApp:ChoiceControl Name="choiceControl2" Heading="Group 2"/>
        <SilverlightApp:ChoiceControl Name="choiceControl3" Heading="Group 3"/>
    </StackPanel>
    <TextBlock Name="statusText" Text="Status Text" TextAlignment="Center"></TextBlock>
</StackPanel>
public MainPage()
{
    InitializeComponent();

    IObservable<IObservable<int[]>> observerSelections = Observable.Merge(
        Observable.FromEvent<RoutedEventArgs>(rbSelectMany, "Checked")
            .Select(_ => from s1 in choiceControl1.OptionSelections
                         from s2 in choiceControl2.OptionSelections
                         from s3 in choiceControl3.OptionSelections
                         select new[] { s1, s2, s3 }),
        Observable.FromEvent<RoutedEventArgs>(rbForkJoin, "Checked")
            .Select(_ => Observable.ForkJoin(choiceControl1.OptionSelections, choiceControl2.OptionSelections, choiceControl3.OptionSelections)),
        Observable.FromEvent<RoutedEventArgs>(rbCombineLatest, "Checked")
            .Select(_ => choiceControl1.OptionSelections
                            .CombineLatest(choiceControl2.OptionSelections, (i, j) => new[] { i, j })
                            .CombineLatest(choiceControl3.OptionSelections, (array, k) => new[] { array[0], array[1], k }))
        );

    IDisposable subscription = null;
    observerSelections.Subscribe(observer =>
                                 {
                                     if (subscription != null)
                                         subscription.Dispose();

                                     choiceControl1.ClearAll();
                                     choiceControl2.ClearAll();
                                     choiceControl3.ClearAll();
                                     statusText.Text = string.Empty;

                                     subscription = observer.Subscribe(UpdateSelectedOptions);                                                                               
                                 });
}

private void UpdateSelectedOptions(int[] values)
{
    statusText.Text = string.Format("Option {0}, Option {1}, Option {2}",
                                    values[0], values[1], values[2]);
}

If you recall in the previous post, I was using the Observable.Merge method to glue three separate events together into one observable, and today I’m using the same technique on the radio buttons that let you swap between the different implementations. The difference in this case is that instead of simply returning an integer, I’m returning an IObservable<int[]>, so the result is an IObservable<IObservable<int[]>>. I then subscribe to this so that each time the selected implementation changes, the existing subscription is disposed, the selections are cleared and a new subscription is instated.

What do you think? Am I being too clever for my own good here? Is an IObservable<IObservable<T>> taking it too far? Let me know in the comments!

The code for this post has been tagged in my github repository.

Sunday, 9 August 2009

Reacting to the Reactive Framework: Part 6

Before I dive into today’s code, there are two things worth mentioning:

  1. If you haven’t already, make haste to Jafar Husain’s blog. Jafar is posting truly interesting examples of using the Reactive Framework - none of the ‘hello world’ fare I’m peddling here.
  2. I mentioned in my previous post that I was having trouble using System.Reactive.dll with non-silverlight projects. Jb Evain has the goods on how you might get around this if you were so inclined. For now I’ll stick with Silverlight as its well suited for including interactive demonstrations in my blog posts.

Today I want to look at combining events using the Reactive Framework (Rx). Here is the demo app for today: (Once again I will remind readers using RSS readers to view this post in a proper browser window so you can see the embedded Silverlight app.)

The idea behind this example is that I am not interested in the selections until an option has been selected from all three groups. From then on, I’d like to be informed whenever the selection changes. This application consists of three instances of a user control, one for each group. Here is the relevant markup and code for the user control:

<StackPanel Margin="20">
    <TextBlock Text="{Binding Heading}" Margin="5" />
    <RadioButton Name="optionButton1" Content="Option 1" />
    <RadioButton Name="optionButton2" Content="Option 2" />
    <RadioButton Name="optionButton3" Content="Option 3" />
</StackPanel>
public ChoiceControl()
{
    InitializeComponent();

    OptionSelections = Observable.Merge(
        Observable.FromEvent<RoutedEventArgs>(optionButton1, "Checked").Select(_ => 1),
        Observable.FromEvent<RoutedEventArgs>(optionButton2, "Checked").Select(_ => 2),
        Observable.FromEvent<RoutedEventArgs>(optionButton3, "Checked").Select(_ => 3)
        );
}

public IObservable<int> OptionSelections { get; private set; }

As you can see, I am creating IObservables from the Checked events of the radio buttons. I’m using the Select method to make observables that simply return the index, and I’m merging those together into one IObservable<int>. So if the user checked option 1, then option 2, then option 3, the OptionSelections would call OnNext on its subscribed observers with values 1, 2 and 3 respectively.

As I mentioned before, this app has 3 instances of this user control:

<StackPanel x:Name="LayoutRoot" Margin="50" Background="Azure" Width="400">
    <StackPanel Orientation="Horizontal" HorizontalAlignment="Center" >
        <SilverlightApp:ChoiceControl Name="choiceControl1" Heading="Choice 1"/>
        <SilverlightApp:ChoiceControl Name="choiceControl2" Heading="Choice 2"/>
        <SilverlightApp:ChoiceControl Name="choiceControl3" Heading="Choice 3"/>
    </StackPanel>
    <TextBlock Name="statusText" TextAlignment="Center"></TextBlock>
</StackPanel>

Now somehow I want to subscribe to the OptionSelections from each control, in such a way that I don’t see any results raised until an option from all three groups have been selected. After a few failed attempts, I stumbled upon this solution:

public MainPage()
{
    InitializeComponent();

    string formatString = "Option {0}, Option {1}, Option {2}";

    var selections = choiceControl1.OptionSelections
        .CombineLatest(choiceControl2.OptionSelections, (i, j) => new[] { i, j })
        .CombineLatest(choiceControl3.OptionSelections, (array, k) => new[] { array[0], array[1], k });

    selections.Subscribe(values => statusText.Text = string.Format(formatString, values[0], values[1], values[2]));            

}

I’m using the CombineLatest extension method, but it will only combine two IObservables so I have to use it twice. The first call returns an IObservable<int[]>, where the array contains two elements. I then combine that with the option selections from choiceControl3, again returning an IObservable<int[]>, this time with all 3 selections. The final step is simply to subscribe to the resulting observable and display the message.

This solution of combining once into an array of two elements and then combining again into an array of three doesn’t seem particularly elegant, but so far I haven’t managed to find anything better. Perhaps I could write a version of CombineLatest that takes more than two observables and hides this ugliness? Its probably not necessary – there’s a good chance that the perfect method already exists, and I’ve just missed it.

In my next post, I’d like to demonstrate some of the failed attempts I made before I found CombineLatest. This demo will give me a chance to create my first IObservable<IObservable<T>>, which I’ve been itching to do :)

Today’s code has been tagged in my github repository.

Sunday, 26 July 2009

Reacting to the Reactive Framework: Part 5

It appears to have gone mostly under the radar, but the Reactive Framework is now out in the wild. The latest release of the Silverlight Toolkit includes System.Reactive.dll, mostly to facilitate the testing of the controls found within the toolkit. Jafar Husain broke the news earlier this week with this excellent post.

Today I want to do a demo that is very similar to the previous demos I’ve done, but of course this time I’ll be using the real Reactive Framework rather than my crappy attempt at implementing it that I’ve inflicted upon you in earlier posts. So far my experiments with using System.Reactive.dll with windows forms have not compiled properly so I’ll switch to using Silverlight for my examples. For now I will continue to focus on exposing button click events as an IObservable. Here is the code:

IObservable<Event<RoutedEventArgs>> clicks = Observable.FromEvent<RoutedEventArgs>(button1, "Click");

int count = 0;
clicks.Subscribe(() => count++);

IObservable<string> messages = from c in clicks
                               select string.Format("Clicked {0} time{1}", count, count > 1 ? "s" : "");
messages.Subscribe(s => button1.Content = s);

And here is the application:

If you are in a RSS reader, you probably won’t be able to see the embedded Silverlight app above. Pop this post out into its own browser tab so that you can revel in the glory of a button that tells you how many times its been clicked!

Lets take a look at what this code is doing. The first step is to convert from an Event to an IObservable:

IObservable<Event<RoutedEventArgs>> clicks = Observable.FromEvent<RoutedEventArgs>(button1, "Click");

The Observable class defines a large swath of extension methods for creating and manipulating IObservables. This overload of the FromEvent method is simple to use, but unfortunately it makes use of a magic string (“click”). There is another overload for FromEvent that allows us to do the same thing with compile-time safety, but it is more verbose. I’ve wrapped it in an extension method:

public static IObservable<Event<RoutedEventArgs>> GetClicks(this Button button)
{
    return Observable.FromEvent((EventHandler<RoutedEventArgs> genericHandler) => new RoutedEventHandler(genericHandler),
                                routedHandler => button.Click += routedHandler,
                                routedHandler => button.Click -= routedHandler);
}

This overload takes three functions as arguments. One to convert from a generic event handler (EventHandler<T>) to the specific event handler that the Click event uses (RoutedEventHandler). The conversion is straightforward, because the two event handlers have the same signature. The other two functions add and remove the handler. This extension method can be used like so:

IObservable<Event<RoutedEventArgs>> clicks = button1.GetClicks();

It would not surprise me if we eventually see extra libraries that define many of these extension methods for us. Ideally they would be unnecessary, and we could simply write:

IObservable<Event<RoutedEventArgs>> clicks = button1.GetObservableEvent(b => b.Click);

But unfortunately the dreaded “The event 'System.Windows.Controls.Primitives.ButtonBase.Click' can only appear on the left hand side of += or –=”  message rears its ugly head. Perhaps in C# 5 the story will be different.

Moving on, the code initializes a count variable and subscribes an increment function to the clicks:

int count = 0;
clicks.Subscribe(() => count++);

You’ll notice that I am simply passing an Action to the Subscribe method, rather than an IObserver. The Reactive Framework won’t force you to use an IObserver if all you want to do is call a function when a new event occurs.

Finally, the code converts the stream of click events into a stream of messages and subscribes to it:

IObservable<string> messages = from c in clicks
                               select string.Format("Clicked {0} time{1}", count, count > 1 ? "s" : "");
messages.Subscribe(s => button1.Content = s);

There are lots of other ways in which to rewrite today’s example code. This version is very short and still quite readable:

int count = 0;
button1.GetClicks().Select(x => ++count)
    .Subscribe(() => button1.Content = string.Format("Clicked {0} time{1}", count, count > 1 ? "s" : ""));

Or I could implement IObserver:

public class CountingButtonObserver : IObserver<Event<RoutedEventArgs>>
{
    private int _count = 0;
    public Button Button { get; set; }

    public void OnNext(Event<RoutedEventArgs> value)
    {
        _count++;
        Button.Content = string.Format("Clicked {0} time{1}", _count, _count > 1 ? "s" : "");
    }

    public void OnError(Exception exception) {}
    public void OnCompleted() {}
}

And use it like so:

button1.GetClicks().Subscribe(new CountingButtonObserver{ Button = button1});

Well, that’s probably enough for today. I’m going to continue to explore the Reactive Framework and see what interesting things I can find. Oh, and let me know if you come across any documentation for it – so far I haven’t found any and it can be quite a struggle to make sense of the various extension methods that are available.

I’ll be pushing my experiments with the Reactive Framework to a github repo.

Sunday, 19 July 2009

Reacting to the Reactive Framework: Part 4

This post is part 4 in a series.

About a week ago another video on the Reactive Framework went up, this time on Channel 9. It was an enjoyable video, though I found it tricky to follow in parts because my understanding of monad theory is virtually non-existent and the discussion does get abstract/theoretical in places. It did however give me a much better understanding of the basic Reactive Framework interfaces (IObserver<T> and IObservable<T>), which meant I had some work to do get my implementation up to scratch!

Side note: I think I started this series badly by getting too complicated too fast, so I’m going to use this as an opportunity to start again.

Here are the relevant interface definitions (I haven’t specified any covariance or contravariance for these types because I am still using C# 3.):

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<T>
{        
    void OnNext(T item);
    void OnDone();
    void OnError(Exception e);
}

Hopefully the expected interaction between these two interfaces is relatively self-explanatory. There are a few differences between this pair of interfaces and what I had in the previous posts:

  • Rather than passing a callback function to the Subscribe method (previously called ‘Attach’) an IObserver is passed. As you would expect, the OnNext(item) method is called on the subscribed observer when the IObservable has a new result.
  • The Subscribe method returns an IDisposable. When it is disposed, the IObserver is unsubscribed.
  • When the IObservable has finished (no more results), each of the IObservers are notified using OnDone().
  • If the IObservable encounters an error, the IObservers are notified using OnError(e).

The task today is to capture the stream of MouseDown events on a button as an IObservable<MouseEventArgs>, and then convert that into an IObservable<string> representing messages to be written to a text box. When the MouseDown event occurs, the text box should be updated accordingly. We can start by obtaining a IObservable<MouseEventArgs> like so:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();

GetMouseDowns() is an extension method – its purpose is to wrap the MouseDown event as an IObservable. Its implementation is very simple:

public static IObservable<MouseEventArgs> GetMouseDowns(this Button button)
{
    var wrapper = new EventWrapper<MouseEventArgs>();
    button.MouseDown += wrapper.Handler;
    return wrapper;
}

The EventWrapper class is wired up to the MouseDown event. It has a collection of observers which it notifies when the MouseDown event occurs:

public class EventWrapper<T> : IObservable<T> where T : EventArgs
{
    private readonly ObserverCollection<T> _observers = new ObserverCollection<T>();

    public void Handler(object sender, T eventArgs)
    {
        _observers.NotifyNext(eventArgs);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _observers.Subscribe(observer);
    }
}

Lets skip the implementation of the ObserverCollection for now. I’d prefer to move on to converting this IObservable<MouseEventArgs> to an IObservable<string>:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();
IObservable<string> messages = from md in mouseEvents
                               select "Mouse down at: " + md.X + "\n";

This works as long as the “Select” LINQ operator has been implemented for IObservable, because the C# compiler converts the above into:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();
IObservable<string> messages = mouseEvents.Select( (MouseEventArgs md) => "Mouse down at: " + md.X + "\n");

The following extension method has the appropriate signature:

public static IObservable<TResult> Select<T, TResult>(this IObservable<T> observable, Func<T, TResult> func)
{
    return new SelectObservable<T, TResult>(observable, func);            
}

By calling this method, really what we’re saying is that we want a new IObservable that calls its subscribers when the old IObservable did, but first uses the passed function “func” to convert from the old result type to the new result type. In this case, that function is the lambda expression:

(MouseEventArgs md) => "Mouse down at: " + md.X + "\n"

Lets skip the implementation of the SelectObservable, and instead look at consuming the messages. I’ll subscribe an observer that will update the textbox:

messages.Subscribe(new TextBoxUpdater(textBox1));

Here is the implementation for the TextBoxUpdater:

public class TextBoxUpdater : IObserver<string>
{
    private readonly TextBox _textBox;

    public TextBoxUpdater(TextBox textBox)
    {
        _textBox = textBox;
    }

    private void AppendText(string text)
    {
        Action textboxUpdater = () => _textBox.AppendText(text);
        _textBox.BeginInvoke(textboxUpdater);
    }

    public void OnNext(string s)
    {
        AppendText(s);
    }

    public void OnDone()
    {
        AppendText("Done\n");
    }

    public void OnError(Exception e)
    {
        AppendText("Error: " + e.Message);
    }
}
As you can see, it implements the OnNext, OnDone and OnError methods by writing some appropriate text, though in this simple example, I’m not really using OnDone or OnError. If I run this program now and click the button a few times, I get this:

image

That wraps it up for today. The code as of this post is available here.

Sunday, 5 July 2009

Reacting to the Reactive Framework: Part 3

In part one of this series, I introduced the Live Labs Reactive Framework and provided a simple example of what can be achieved when using LINQ for reactive programming. In part two I did my best to explain how my example worked. In today’s post I want to extend the example slightly. Just to reiterate, this series is not about using the Live Labs Reactive Framework as it has not yet been released. I’m exploring the idea of a reactive framework using my own implementation.

Here is my example from the previous posts:

Func<MouseEventArgs, int> slowOperation = args =>
{
    System.Threading.Thread.Sleep(3000);
    return args.X;
};

IObservable<int> observable = from md in button1.GetMouseDowns()
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

The idea behind this code is that the IObservable<int> represents a stream of integers that are the result of capturing the mouse down event on a button, doing some “slow processing” (really just a 3 second sleep) asynchronously, and returning the x coordinate from the mouse down event. You can then attach something to that observable like so:

Action<string> textboxUpdater = s => textBox1.AppendText(s);
observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));

The attached code updates the textbox as the values bubble up through the IObservable. Unfortunately its a little more complicated than I would like because the update has to be marshalled onto the UI thread – hence the BeginInvoke call. Today I want to modify this example so that the textbox is only updated when the right mouse button was clicked. The MouseEventArgs can tell me which button was clicked, so really this is just a matter of implementing support for where:

IObservable<int> observable = from md in button1.GetMouseDowns()
                              where md.EventArgs.Button == MouseButtons.Right
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

Well this is a very natural way to express what I want – and the implementation is not too hard. First I create the appropriate extension method:

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    return new WhereObservable<T>(source, predicate);
}

This extension method is just a wrapper call for a class. In this case, its WhereObservable<T> which takes a source IObservable, and a predicate to apply:

public class WhereObservable<T> : IObservable<T>
 {
     private readonly IObservable<T> _source;
     private readonly Func<T, bool> _predicate;

     public WhereObservable(IObservable<T> source, Func<T, bool> predicate)
     {
         _source = source;
         _predicate = predicate;
     }

     public void Attach(Action<T> action)
     {
         _source.Attach(t => { if (_predicate(t)) action(t); });
     }
 }

All I’m doing here is wrapping an inner observable so that I can check to see if the predicate applies before I let the observable event bubble up. That’s all there is to it. My example will now only update the textbox if the right mouse button was clicked.

My work on this project is available on github. To get the code as it was at the time this post was written, use this tag.

Sunday, 31 May 2009

Reacting to the Reactive Framework: Part 2

DISCLAIMER: The code in this post was the result of a big hackfest with the single goal of getting a particular use case working. I am telling you right now that I paid no attention to potential concurrency problems and that if you try to reuse any of this stuff you are just asking for trouble.

In the previous post, I explained how Eric Meijer’s recent Lang.NET presentation on the LiveLabs Reactive Framework piqued my interest. Wondering how the framework works, I decided to have a go at emulating some of the basic functionality it appears to provide. This code demonstrates the functionality I’ve implemented so far:

Func<MouseEventArgs, int> slowOperation = args =>
{
    System.Threading.Thread.Sleep(3000);
    return args.X;
};

IObservable<int> observable = from md in button1.GetMouseDowns()
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

Action<string> textboxUpdater = s => textBox1.AppendText(s);
observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));

Today I am going to work my way through the query comprehension you can see here and do my best to explain how I got this working. Perhaps a good place to start would be to convert the query comprehension into a method chain in the same way the C# compiler does for us under the covers. Resharper can do this automatically, which is convenient:

IObservable<int> observable = button1.GetMouseDowns().SelectMany(
                    md => slowOperation.AsAsyncObservable(md.EventArgs), 
                    (md, x) => x
                    );

The first call here is to an extension method called GetMouseDowns:

public static IObservable<EventResult<Button, MouseEventArgs>> GetMouseDowns(this Button b)
{
    var wrapper = new EventWrapper<Button, MouseEventArgs>();
    b.MouseDown += wrapper.Handle;
    return wrapper;
}

So far, this is relatively straightforward. The GetMouseDowns extension method creates an EventWrapper, and wires that EventWrapper up to the MouseDown event on the button. You can see that the wrapper is being returned as an IObservable of an EventResult. Here is the code for these:

public interface IObservable<T>
{
    void Attach(Action<T> action);
}
public class EventResult<TSender, TArgs>
{
    public EventResult(TSender sender, TArgs args)
    {
        Sender = sender;
        EventArgs = args;
    }

    public TSender Sender { get; private set; }
    public TArgs EventArgs { get; private set; }
}
public class EventWrapper<TSender, TArgs> : IObservable<EventResult<TSender,TArgs>>
{
    private List<Action<EventResult<TSender, TArgs>>> _attached = new List<Action<EventResult<TSender, TArgs>>>();

    public void Handle(object sender, TArgs e)
    {
        foreach (var action in _attached)
            action(new EventResult<TSender,TArgs>((TSender)sender, e ));
    }

    public void Attach(Action<EventResult<TSender, TArgs>> action)
    {
        _attached.Add(action);
    }
}

Now there is some really naive code here. What would happen if someone called Attach while we were in the middle of handling the event? It would crash, because the list of actions to be called would be modified while it was being iterated over. In general, the code for the EventWrapper feels bad. I am sure it can be done much better, but it works for now.

So the result of the GetMouseDowns method is an EventWrapper that will call the attached methods when the mouse down event occurs, and this is exposed as an IObservable. SelectMany is then called on that IObservable. This is another extension method:

public static IObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
    return new SelectManyObservable<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}

So how on earth did I arrive at this method? Its quite simple actually – I stole its signature from the SelectMany extension method implemented on IEnumerable. The interesting thing about LINQ query comprehensions is that they actually don’t have a dependence on IEnumerable or IQueryable. Instead they depend on the existence of particular methods with the appropriate signature such as Select, SelectMany, Where, GroupBy, etc. By writing these extension methods for IObservable, I can then write query comprehensions against IObservables.

Once its established that my SelectMany borrowed its signature from the appropriate extension method on IEnumerable, there isn’t much more to know about it. You can see that all it simply does is return a SelectManyObservable that wraps the passed arguments. What is the SelectManyObservable exactly? Well, here is the code:

public class SelectManyObservable<TSource, TCollection, TResult> : IObservable<TResult>
{
    private readonly IObservable<TSource> _source;
    private readonly Func<TSource, IObservable<TCollection>> _collectionSelector;
    private readonly Func<TSource, TCollection, TResult> _resultSelector;

    public SelectManyObservable(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
    {
        _source = source;
        _collectionSelector = collectionSelector;
        _resultSelector = resultSelector;
    }

    public void Attach(Action<TResult> action)
    {
        _source.Attach(
            s => _collectionSelector(s).Attach(
                c => action(_resultSelector(s, c))
            )
        );            
    }
}

So this class simply wraps an underlying IObservable, and when methods are attached to the SelecyManyObservable, it actually gets attached to the underlying IObservable. The funny thing about this code is that it practically wrote itself. It was basically a matter of just figuring out how the underlying observable, collectionSelector, resultSelector and attached action related to each other and then calling them accordingly. It felt like solving a 4 piece jigsaw puzzle with only 1 solution.

Before the explanation of the query comprehension is complete, there is one more extension method that requires explanation:

public static IObservable<TResult> AsAsyncObservable<TInput, TResult>(this Func<TInput,TResult> funcToObserve,TInput input)
{
    return new AsyncWrapper<TInput, TResult>(funcToObserve, input);
}

This pattern should look familiar. Its like the SelectMany in that it simply returns a wrapper object that takes all the method arguments as parameters. The signature however, wasn’t borrowed from the framework. The point of this method is to allow the user to convert an anonymous delegate or lambda expression into an IObservable. Now I hope you packed the goggles I mentioned in my previous post, because AsyncWrapper is really nasty:

public class AsyncWrapper<TInput, TResult> : IObservable<TResult>
{
    private List<Action<TResult>> _attached = new List<Action<TResult>>();

    public AsyncWrapper(Func<TInput, TResult> funcToObserve, TInput input)
    {
        funcToObserve.BeginInvoke(input, CompletedCallback, null);
    }

    public void Attach(Action<TResult> action)
    {
        _attached.Add(action);
    }

    private void CompletedCallback(IAsyncResult asyncResult)
    {
        TResult calculatedValue = ((Func<TInput, TResult>) ((AsyncResult) asyncResult).AsyncDelegate).EndInvoke(
            asyncResult);

        foreach (var action in _attached)
            action(calculatedValue);
    }
}

It might not seem so bad, until you look at what the constructor is doing. When the AsyncWrapper is created, it fires off the function to observe. Now when that function completes, the callback will be invoked, and as a result all the attached actions will also be invoked. The reason why this is all so heinous is that the actions will have to be attached AFTER the function to observe is invoked, but before it completes. As a result, I am fairly sure that if we changed the slow asynchronous operation to be much faster, then this code would stop working. Surely a better solution can be found, and I do hope to attend to this later.

So that covers all the code that makes my example work. The nice thing about this approach is the composability. It is convenient to be able to work with enumerables using select, where, etc and the same is true for observables. I am really looking forward to getting my hands on the Reactive Framework code. In the mean time, I’ll continue to experiment. In the next post I’ll extend this example, starting with adding support for ‘where’ operations.

Tuesday, 26 May 2009

Reacting to the Reactive Framework: Part 1

Last night I managed to get some code working and I was very excited, but it was far too late to start writing a blog post. Here is the code:

Func<MouseEventArgs, int> slowOperation = args =>
{
    System.Threading.Thread.Sleep(3000);
    return args.X;
};

IObservable<int> observable = from md in button1.GetMouseDowns()
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

Action<string> textboxUpdater = s => textBox1.AppendText(s);
observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));

This code is sitting inside a very simple windows form. The form has a button and a textbox. When the button is clicked, a message is written to the textbox about 3 seconds later, relaying the relative X coordinate of the mouse when the button was clicked. The 3 second delay represents some sort of slow asynchronous operation, currently simulated with a simple Thread.Sleep(). Because the slow operation is done on a different thread, we have to use Control.BeginInvoke() to get the textbox to update on the UI thread. GetMouseDowns() is an extension method on Button that helps set the whole thing up by returning an IObservable that is wired to the MouseDown event.

So at this point you might be wondering what this is all about. To get the full story, I encourage you to watch Eric Meijer’s video on the LiveLabs Reactive Framework (direct wmv link). In this very enjoyable presentation, Eric gives us a run down the Reactive Framework, which appears to be a set of libraries that unite various types of “reactive” operations under a common pair of interfaces, IObserver<T> and IObservable<T> which are then integrated into LINQ. I found the presentation to be quite a tease. Erik shows just enough to give you an idea of what the Reactive Framework is all about, but he skims over the implementation details quite lightly. I ended up watching the thing a couple of times, trying to wrap my head around the idea of LINQ query comprehensions (e.g. from x in y select x.ID) operating against non enumerable/queryable types. In particular, I started wondering whether I could implement something like the code from this slide:

image

As you can see, my code at the top is not too dissimilar from Erik’s code here. Erik’s example is a web page with dictionary suggest on a textbox. When the user enters a character into the textbox, the dictionary suggest runs asynchronously against the input entered so far and when a return value is received it is rendered as html.

There is certainly quite a lot regarding the Reactive Framework that I don’t understand. I’ve been unable to find any resources beyond Erik’s talk – certainly the code doesn’t seem available anywhere. So far I don’t even understand how IObserver fits in. Can YOU see anything that might implement IObserver in the above slide? But in any case, the idea is intriguing, and trying to figure out how to do this myself is teaching me quite a bit about LINQ. In the next post, I’ll do my best to explain how the implementation works. Don’t forget your goggles, this is some pretty hacky code coming up.