Sunday 26 July 2009

Reacting to the Reactive Framework: Part 5

It appears to have gone mostly under the radar, but the Reactive Framework is now out in the wild. The latest release of the Silverlight Toolkit includes System.Reactive.dll, mostly to facilitate the testing of the controls found within the toolkit. Jafar Husain broke the news earlier this week with this excellent post.

Today I want to do a demo that is very similar to the previous demos I’ve done, but of course this time I’ll be using the real Reactive Framework rather than my crappy attempt at implementing it that I’ve inflicted upon you in earlier posts. So far my experiments with using System.Reactive.dll with windows forms have not compiled properly so I’ll switch to using Silverlight for my examples. For now I will continue to focus on exposing button click events as an IObservable. Here is the code:

IObservable<Event<RoutedEventArgs>> clicks = Observable.FromEvent<RoutedEventArgs>(button1, "Click");

int count = 0;
clicks.Subscribe(() => count++);

IObservable<string> messages = from c in clicks
                               select string.Format("Clicked {0} time{1}", count, count > 1 ? "s" : "");
messages.Subscribe(s => button1.Content = s);

And here is the application:

If you are in a RSS reader, you probably won’t be able to see the embedded Silverlight app above. Pop this post out into its own browser tab so that you can revel in the glory of a button that tells you how many times its been clicked!

Lets take a look at what this code is doing. The first step is to convert from an Event to an IObservable:

IObservable<Event<RoutedEventArgs>> clicks = Observable.FromEvent<RoutedEventArgs>(button1, "Click");

The Observable class defines a large swath of extension methods for creating and manipulating IObservables. This overload of the FromEvent method is simple to use, but unfortunately it makes use of a magic string (“click”). There is another overload for FromEvent that allows us to do the same thing with compile-time safety, but it is more verbose. I’ve wrapped it in an extension method:

public static IObservable<Event<RoutedEventArgs>> GetClicks(this Button button)
{
    return Observable.FromEvent((EventHandler<RoutedEventArgs> genericHandler) => new RoutedEventHandler(genericHandler),
                                routedHandler => button.Click += routedHandler,
                                routedHandler => button.Click -= routedHandler);
}

This overload takes three functions as arguments. One to convert from a generic event handler (EventHandler<T>) to the specific event handler that the Click event uses (RoutedEventHandler). The conversion is straightforward, because the two event handlers have the same signature. The other two functions add and remove the handler. This extension method can be used like so:

IObservable<Event<RoutedEventArgs>> clicks = button1.GetClicks();

It would not surprise me if we eventually see extra libraries that define many of these extension methods for us. Ideally they would be unnecessary, and we could simply write:

IObservable<Event<RoutedEventArgs>> clicks = button1.GetObservableEvent(b => b.Click);

But unfortunately the dreaded “The event 'System.Windows.Controls.Primitives.ButtonBase.Click' can only appear on the left hand side of += or –=”  message rears its ugly head. Perhaps in C# 5 the story will be different.

Moving on, the code initializes a count variable and subscribes an increment function to the clicks:

int count = 0;
clicks.Subscribe(() => count++);

You’ll notice that I am simply passing an Action to the Subscribe method, rather than an IObserver. The Reactive Framework won’t force you to use an IObserver if all you want to do is call a function when a new event occurs.

Finally, the code converts the stream of click events into a stream of messages and subscribes to it:

IObservable<string> messages = from c in clicks
                               select string.Format("Clicked {0} time{1}", count, count > 1 ? "s" : "");
messages.Subscribe(s => button1.Content = s);

There are lots of other ways in which to rewrite today’s example code. This version is very short and still quite readable:

int count = 0;
button1.GetClicks().Select(x => ++count)
    .Subscribe(() => button1.Content = string.Format("Clicked {0} time{1}", count, count > 1 ? "s" : ""));

Or I could implement IObserver:

public class CountingButtonObserver : IObserver<Event<RoutedEventArgs>>
{
    private int _count = 0;
    public Button Button { get; set; }

    public void OnNext(Event<RoutedEventArgs> value)
    {
        _count++;
        Button.Content = string.Format("Clicked {0} time{1}", _count, _count > 1 ? "s" : "");
    }

    public void OnError(Exception exception) {}
    public void OnCompleted() {}
}

And use it like so:

button1.GetClicks().Subscribe(new CountingButtonObserver{ Button = button1});

Well, that’s probably enough for today. I’m going to continue to explore the Reactive Framework and see what interesting things I can find. Oh, and let me know if you come across any documentation for it – so far I haven’t found any and it can be quite a struggle to make sense of the various extension methods that are available.

I’ll be pushing my experiments with the Reactive Framework to a github repo.

Sunday 19 July 2009

Reacting to the Reactive Framework: Part 4

This post is part 4 in a series.

About a week ago another video on the Reactive Framework went up, this time on Channel 9. It was an enjoyable video, though I found it tricky to follow in parts because my understanding of monad theory is virtually non-existent and the discussion does get abstract/theoretical in places. It did however give me a much better understanding of the basic Reactive Framework interfaces (IObserver<T> and IObservable<T>), which meant I had some work to do get my implementation up to scratch!

Side note: I think I started this series badly by getting too complicated too fast, so I’m going to use this as an opportunity to start again.

Here are the relevant interface definitions (I haven’t specified any covariance or contravariance for these types because I am still using C# 3.):

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<T>
{        
    void OnNext(T item);
    void OnDone();
    void OnError(Exception e);
}

Hopefully the expected interaction between these two interfaces is relatively self-explanatory. There are a few differences between this pair of interfaces and what I had in the previous posts:

  • Rather than passing a callback function to the Subscribe method (previously called ‘Attach’) an IObserver is passed. As you would expect, the OnNext(item) method is called on the subscribed observer when the IObservable has a new result.
  • The Subscribe method returns an IDisposable. When it is disposed, the IObserver is unsubscribed.
  • When the IObservable has finished (no more results), each of the IObservers are notified using OnDone().
  • If the IObservable encounters an error, the IObservers are notified using OnError(e).

The task today is to capture the stream of MouseDown events on a button as an IObservable<MouseEventArgs>, and then convert that into an IObservable<string> representing messages to be written to a text box. When the MouseDown event occurs, the text box should be updated accordingly. We can start by obtaining a IObservable<MouseEventArgs> like so:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();

GetMouseDowns() is an extension method – its purpose is to wrap the MouseDown event as an IObservable. Its implementation is very simple:

public static IObservable<MouseEventArgs> GetMouseDowns(this Button button)
{
    var wrapper = new EventWrapper<MouseEventArgs>();
    button.MouseDown += wrapper.Handler;
    return wrapper;
}

The EventWrapper class is wired up to the MouseDown event. It has a collection of observers which it notifies when the MouseDown event occurs:

public class EventWrapper<T> : IObservable<T> where T : EventArgs
{
    private readonly ObserverCollection<T> _observers = new ObserverCollection<T>();

    public void Handler(object sender, T eventArgs)
    {
        _observers.NotifyNext(eventArgs);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _observers.Subscribe(observer);
    }
}

Lets skip the implementation of the ObserverCollection for now. I’d prefer to move on to converting this IObservable<MouseEventArgs> to an IObservable<string>:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();
IObservable<string> messages = from md in mouseEvents
                               select "Mouse down at: " + md.X + "\n";

This works as long as the “Select” LINQ operator has been implemented for IObservable, because the C# compiler converts the above into:

IObservable<MouseEventArgs> mouseEvents = button1.GetMouseDowns();
IObservable<string> messages = mouseEvents.Select( (MouseEventArgs md) => "Mouse down at: " + md.X + "\n");

The following extension method has the appropriate signature:

public static IObservable<TResult> Select<T, TResult>(this IObservable<T> observable, Func<T, TResult> func)
{
    return new SelectObservable<T, TResult>(observable, func);            
}

By calling this method, really what we’re saying is that we want a new IObservable that calls its subscribers when the old IObservable did, but first uses the passed function “func” to convert from the old result type to the new result type. In this case, that function is the lambda expression:

(MouseEventArgs md) => "Mouse down at: " + md.X + "\n"

Lets skip the implementation of the SelectObservable, and instead look at consuming the messages. I’ll subscribe an observer that will update the textbox:

messages.Subscribe(new TextBoxUpdater(textBox1));

Here is the implementation for the TextBoxUpdater:

public class TextBoxUpdater : IObserver<string>
{
    private readonly TextBox _textBox;

    public TextBoxUpdater(TextBox textBox)
    {
        _textBox = textBox;
    }

    private void AppendText(string text)
    {
        Action textboxUpdater = () => _textBox.AppendText(text);
        _textBox.BeginInvoke(textboxUpdater);
    }

    public void OnNext(string s)
    {
        AppendText(s);
    }

    public void OnDone()
    {
        AppendText("Done\n");
    }

    public void OnError(Exception e)
    {
        AppendText("Error: " + e.Message);
    }
}
As you can see, it implements the OnNext, OnDone and OnError methods by writing some appropriate text, though in this simple example, I’m not really using OnDone or OnError. If I run this program now and click the button a few times, I get this:

image

That wraps it up for today. The code as of this post is available here.

Sunday 5 July 2009

Reacting to the Reactive Framework: Part 3

In part one of this series, I introduced the Live Labs Reactive Framework and provided a simple example of what can be achieved when using LINQ for reactive programming. In part two I did my best to explain how my example worked. In today’s post I want to extend the example slightly. Just to reiterate, this series is not about using the Live Labs Reactive Framework as it has not yet been released. I’m exploring the idea of a reactive framework using my own implementation.

Here is my example from the previous posts:

Func<MouseEventArgs, int> slowOperation = args =>
{
    System.Threading.Thread.Sleep(3000);
    return args.X;
};

IObservable<int> observable = from md in button1.GetMouseDowns()
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

The idea behind this code is that the IObservable<int> represents a stream of integers that are the result of capturing the mouse down event on a button, doing some “slow processing” (really just a 3 second sleep) asynchronously, and returning the x coordinate from the mouse down event. You can then attach something to that observable like so:

Action<string> textboxUpdater = s => textBox1.AppendText(s);
observable.Attach(x => textBox1.BeginInvoke(textboxUpdater, "Mouse down: " + x + "\n"));

The attached code updates the textbox as the values bubble up through the IObservable. Unfortunately its a little more complicated than I would like because the update has to be marshalled onto the UI thread – hence the BeginInvoke call. Today I want to modify this example so that the textbox is only updated when the right mouse button was clicked. The MouseEventArgs can tell me which button was clicked, so really this is just a matter of implementing support for where:

IObservable<int> observable = from md in button1.GetMouseDowns()
                              where md.EventArgs.Button == MouseButtons.Right
                              from x in slowOperation.AsAsyncObservable(md.EventArgs)
                              select x;

Well this is a very natural way to express what I want – and the implementation is not too hard. First I create the appropriate extension method:

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate)
{
    return new WhereObservable<T>(source, predicate);
}

This extension method is just a wrapper call for a class. In this case, its WhereObservable<T> which takes a source IObservable, and a predicate to apply:

public class WhereObservable<T> : IObservable<T>
 {
     private readonly IObservable<T> _source;
     private readonly Func<T, bool> _predicate;

     public WhereObservable(IObservable<T> source, Func<T, bool> predicate)
     {
         _source = source;
         _predicate = predicate;
     }

     public void Attach(Action<T> action)
     {
         _source.Attach(t => { if (_predicate(t)) action(t); });
     }
 }

All I’m doing here is wrapping an inner observable so that I can check to see if the predicate applies before I let the observable event bubble up. That’s all there is to it. My example will now only update the textbox if the right mouse button was clicked.

My work on this project is available on github. To get the code as it was at the time this post was written, use this tag.