Thursday, May 27, 2010

Reactive Extensions (Rx) for .NET: A few samples

Reactive Extensions is a framework published by Microsoft DevLabs. The framework's goal is to make event-driven programming easier, by providing observable push collections and tons of helper functions. I'll leave the rest of the plain text selling points to their own web site, though, and provide a few code examples instead.

Example: Calling several methods in parallel, and combining the results into an anonymous structure.

Imagine three possibly long-running methods, Foo, Bar and Baz:

int Foo()
{
Thread.Sleep(200);
return 42;
}

string Bar()
{
Thread.Sleep(500);
return "The answer is {0}! {1}";
}

string Baz()
{
return "Awesome!";
}

What you want to do be doing, is call all three of these independent methods, collect the results in a struct, and output it to the screen.

With plain .NET code, you'd whip up delegates, begininvokes and waithandles, such as:

var waitHandles = new List<WaitHandle>();
int fooRet = 0;
Func<int> foo = Foo;
var fooRes = foo.BeginInvoke(res => { fooRet = foo.EndInvoke(res); }, null);
waitHandles.Add(fooRes.AsyncWaitHandle);
string barRet = "";
Func<string> bar = Bar;
var barRes = bar.BeginInvoke(res => { barRet = bar.EndInvoke(res); }, null);
waitHandles.Add(barRes.AsyncWaitHandle);
string bazRet = "";
Func<string> baz = Baz;
var bazRes = baz.BeginInvoke(res => { bazRet = baz.EndInvoke(res); }, null);
waitHandles.Add(bazRes.AsyncWaitHandle);
WaitHandle.WaitAll(waitHandles.ToArray());
Console.Out.WriteLine(barRet, fooRet, bazRet);

Not a pretty sight, that's for certain, and it doesn't even begin to take into account anomalies in the results, or errors reported from any of the methods.

With Rx, the above becomes trivial. The Linq like syntax will, in essence, say that we want to join the results of three async executions, wait for them to
complete and print the results on success. If an error occurs, we'll show an exception.

Observable.Join(
Observable.ToAsync<int>(Foo)()
.And(Observable.ToAsync<string>(Bar)())
.And(Observable.ToAsync<string>(Baz)())
.Then((foo, bar, baz) =>
new { Foo = foo, Bar = bar, Baz = baz })
).Subscribe(
o => Console.WriteLine(o.Bar, o.Foo, o.Baz),
e => Console.WriteLine("Exception: {0}", e));

Opposed to the BeginInvoke mayhem further up, this is pretty good looking!

Should you wish to asynchronously execute the methods in order, that would be even more hairy without Rx, with several wait handle calls, possible helper methods, and who-knows-what. With Rx, it's actually even more trivial than the parallell execution.

(from foo in Observable.ToAsync<int>(Foo)()
from bar in Observable.ToAsync<string>(Bar)()
from baz in Observable.ToAsync<string>(Baz)()
select new { Foo = foo, Bar = bar, Baz = baz })
.Subscribe(o => Console.WriteLine(o.Bar, o.Foo, o.Baz));

Key here is the fact that it's *asynchronous*. The call to Subscribe will not block execution, so your application is free to do whatever it pleases while Foo, Bar and Baz execute.

Example: Event throttling.

Suppose you have an alert generating source, such as:

class EventFiringClass
{
public event EventHandler<EventArgs> Alert;

public void TriggerManyAlerts()
{
for (int i = 0; i < 200; ++i)
{
Thread.Sleep(100);
if (Alert != null)
{
Alert(this, null);
}
}
}
}

That could obviously generate a ton of alerts, with a mere 100 msec between each one. Adding a notification handler to this, such as

var ec = new EventFiringClass();
ec.Alert += (s, o) => Console.WriteLine("Alert Flood!");
ec.TriggerManyAlerts();

... would print the same message, each time the event was raised - and that's not necessarily something you'd want. Alerts, mouse events, keyboard events - anything that's usually repeated, but not necessarily wanted more than once - all can be easily throttled by Rx.

Adding a time throttled (they can also be e.g. count throttled) event handler to the above event would expand to:

Observable.FromEvent<EventArgs>
(h => ec.Alert += h,
h => ec.Alert -= h)
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe(e => Console.WriteLine("Alert Throttled!"));

This won't output anything until the event stream has been silent for at least half a second.

While a fantastic feature for dealing with delayed button presses or mouse moves, the use case for alerts here is however intentionally shifty. With the above code, you wouldn't see a single alert if the stream of raised alert events was continous.

For alerts, and many other event sources, what we'd really want is to easily get distinct updates only - I'm interested in the first alert for error "Foo", not necessarily 500 more immediately following.

With Rx, implementing this involves a call to DistinctUntilChanged.

First of all, imagine our alert event now looks like:

public class AlertArgs : EventArgs
{
public string AlertCode { get; set; }
}
public event EventHandler<AlertArgs> Alert;

[...]
Alert(this, new AlertArgs {AlertCode = "Something exploded!"});

Only printing one distinct alert code at a time, then amounts to:

var ec = new EventFiringClass();
Observable.FromEvent<EventFiringClass.AlertArgs>
(h => ec.Alert += h,
h => ec.Alert -= h)
.DistinctUntilChanged(o => o.EventArgs.AlertCode)
.Subscribe(e => Console.WriteLine("Alert: {0}", e.EventArgs.AlertCode));
ec.TriggerManyAlerts();

Example: Asynchronously providing unique Tweets as an Observable collection, using Rx and Linq2Twitter.

The following sample app will use Linq2Twitter to check for new tweets at an interval of 30 seconds, and feed them through an observable Rx collection.

In my WPF test app, I've got it hooked to a list box named TweetList, as can be seen from the Subscribe handler. Also worth noting is the call to ObserveOn, which tells Rx to use the current WPF UI Dispatcher to call the subscription observer. By doing so, we won't have an issue with cross thread UI updates when showing the tweets, nor will we have to monkey around with Dispatcher.BeginInvoke to fix said issue.

private void InitTwitterListing()
{
var twitterCtx = new TwitterContext();
var seen = new Dictionary<string, bool>();
var tweetStream =
from t in Observable.Return(1)
.Concat(Observable.Interval(TimeSpan.FromSeconds(30)))
from tweet in GetUniqueTweets(twitterCtx, seen)
select tweet;
tweetStream.ObserveOn(new DispatcherScheduler(Dispatcher))
.Subscribe(tweet =>
TweetList.Items.Add(
String.Format("[{0}] {1}", tweet.CreatedAt, tweet.Text)));
}

private static IObservable<Status> GetUniqueTweets(TwitterContext ctx, Dictionary<string, bool> seenList)
{
var filter = new Func<Status, bool>(tweet =>
seenList.ContainsKey(tweet.StatusID) ? false : seenList[tweet.StatusID] = true);
return (from tweet in ctx.Status
where tweet.Type == StatusType.User &&
tweet.ScreenName == "einaros" &&
tweet.Count == 10 &&
filter(tweet)
select tweet).Reverse().ToObservable();
}

The most interesting part here is the tweetStream observable, which is initialized in the InitTwitterListing method. It will keep spewing values indefinitely, the first one after a timeout of TimeSpan.Zero seconds, then at an interval of 30 seconds. The actually selected values stem from the GetUniqueTweets method, which does a farily straight forward poll from Linq2Twitter.

For each new call to Linq2Twitter, 10 tweets will be requested. All returned tweets will then be filtered (and taken notice of, if not already seen), then reversed and returned as an Rx observable sequence.

1 comment:

The Reverand said...

I LOVE this. Thanks.
Usually I find query syntax to look out of place in most code, but in this application I'd say it not only fits very well but I would use it instead of LINQ methods.
Again SUPER thanks.