Tuesday 11 August 2009

Reacting to the Reactive Framework: Part 7

The demo application for today allows the user to experiment with a couple of the mechanisms that I tried for part 6. As you may recall, I wanted an IObservable<int[]> that would return the three selections made from three groups of radio buttons, and only begin returning values once a selection had been made from all three. The demo app allows you to switch between using SelectMany, ForkJoin and CombineLatest and observe how the behavior changes:

The first approach I tried was SelectMany - I’ll use the query comprehension syntax as it is much more readable than calling the SelectMany extension method directly:

from s1 in choiceControl1.OptionSelections
from s2 in choiceControl2.OptionSelections
from s3 in choiceControl3.OptionSelections
select new[] { s1, s2, s3 }

This syntax might be readable, but its also a little misleading. It looks very uniform, like it wouldn’t matter which order you wrote those three ‘from’ statements in. But if you are familiar with LINQ statements like this one, you will know that the ordering IS important. To observe this, activate the SelectMany option in the app above, and then make your selections in reverse order i.e. select an option from group 3, then group 2, then group 1. Notice how you don’t get a selection readout? What’s actually happening here is that the selections made in groups 2 and 3 are ignored until a selection is made in group 1. Then the selections made in group 3 are ignored, until a selection is made in group 2. Then finally selections from group 3 will trigger the observable and raise a result. I could go into more detail on what’s happening here, but is there much point? Its obvious that this implementation is not at all close to the desired result. Lets move on.

When I came across the signature for ForkJoin, I thought I had found what I was looking for:

public static IObservable<TSource[]> ForkJoin<TSource>(params IObservable<TSource>[] sources); 

It converts many IObservable<T>’s into one IObservable<T[]>, which is exactly what I want. The bad news is that it only works once. Go ahead and try ForkJoin in the sample app – the status text will update once a selection has been made from all three groups, but it will not update again as the selection continues to change. It did occur to me that perhaps I could use ForkJoin and somehow re-subscribe each time it fires, but I was reluctant to go down that path because it doesn’t feel like a functional (as in functional programming) solution.

The demo app for today uses CombineLatest in the same way as in my previous post:

  .CombineLatest(choiceControl2.OptionSelections, (i, j) => new[] { i, j })
  .CombineLatest(choiceControl3.OptionSelections, (array, k) => new[] { array[0], array[1], k }))

This works fine, though its not pretty. I’ll keep my eye out for a better solution, but before I finish up today I want to look at the code behind today’s demo app:

<StackPanel Margin="20">
    <TextBlock Text="Combining Example" HorizontalAlignment="Center" Margin="10" />
    <StackPanel Orientation="Vertical" HorizontalAlignment="Left">
        <TextBlock Text="Select a means of combination:" />
        <RadioButton Name="rbSelectMany" Content="Use SelectMany" />
        <RadioButton Name="rbForkJoin" Content="Use ForkJoin" />
        <RadioButton Name="rbCombineLatest" Content="Use CombineLatest" />                               
        <TextBlock Text="Select an option from each group below, and then experiment with changing your selections." TextWrapping="Wrap" Margin="0,10,0,0"  />
    <StackPanel Orientation="Horizontal" HorizontalAlignment="Center" >
        <SilverlightApp:ChoiceControl Name="choiceControl1" Heading="Group 1"/>
        <SilverlightApp:ChoiceControl Name="choiceControl2" Heading="Group 2"/>
        <SilverlightApp:ChoiceControl Name="choiceControl3" Heading="Group 3"/>
    <TextBlock Name="statusText" Text="Status Text" TextAlignment="Center"></TextBlock>
public MainPage()

    IObservable<IObservable<int[]>> observerSelections = Observable.Merge(
        Observable.FromEvent<RoutedEventArgs>(rbSelectMany, "Checked")
            .Select(_ => from s1 in choiceControl1.OptionSelections
                         from s2 in choiceControl2.OptionSelections
                         from s3 in choiceControl3.OptionSelections
                         select new[] { s1, s2, s3 }),
        Observable.FromEvent<RoutedEventArgs>(rbForkJoin, "Checked")
            .Select(_ => Observable.ForkJoin(choiceControl1.OptionSelections, choiceControl2.OptionSelections, choiceControl3.OptionSelections)),
        Observable.FromEvent<RoutedEventArgs>(rbCombineLatest, "Checked")
            .Select(_ => choiceControl1.OptionSelections
                            .CombineLatest(choiceControl2.OptionSelections, (i, j) => new[] { i, j })
                            .CombineLatest(choiceControl3.OptionSelections, (array, k) => new[] { array[0], array[1], k }))

    IDisposable subscription = null;
    observerSelections.Subscribe(observer =>
                                     if (subscription != null)

                                     statusText.Text = string.Empty;

                                     subscription = observer.Subscribe(UpdateSelectedOptions);                                                                               

private void UpdateSelectedOptions(int[] values)
    statusText.Text = string.Format("Option {0}, Option {1}, Option {2}",
                                    values[0], values[1], values[2]);

If you recall in the previous post, I was using the Observable.Merge method to glue three separate events together into one observable, and today I’m using the same technique on the radio buttons that let you swap between the different implementations. The difference in this case is that instead of simply returning an integer, I’m returning an IObservable<int[]>, so the result is an IObservable<IObservable<int[]>>. I then subscribe to this so that each time the selected implementation changes, the existing subscription is disposed, the selections are cleared and a new subscription is instated.

What do you think? Am I being too clever for my own good here? Is an IObservable<IObservable<T>> taking it too far? Let me know in the comments!

The code for this post has been tagged in my github repository.

No comments:

Post a Comment