Reactive Extensions for .NET – Teil 1

by Bernhard Wurm 18. October 2010 15:20

Ich bin gestern auf etwas interessantes gestoßen: Reactive Extensions for .NET. Hört sich spannend an.

Hierbei handelt es sich um ein Lab-Projekt von Microsoft Research mit dem Ziel ein einheitliches, einfaches Programmiermodell für Ansynchrone und Event-basierte Implementierung. Bzw. ein Programmiermodell um hier eine einfachere Verknüpfung dieser beiden (Asynchron und Event-Based) Welten.

Also habe ich  mir RX für .NET 4 von der Webseite heruntergeladen und installiert und meine ersten Gehversuche gestartet.

Demoprojekt: AutoComplete-Textbox

Also man nehme eine Textbox “txtEnter”, sowie eine Listbox, in welche die Ergebnisse geschrieben werden sollen “lstBx”.

Die klassischen Anforderungen:

  1. Während der Benutzer in eine Textbox Suchbegriffe eintippt, wird die Listbox mit  Suchergebnissen gefüllt.
  2. Die Suchergebnisse werden vom Server geladen. Daher kann es zu einer Zeitverzögerung kommen. Die Benutzeroberfläche muss jedoch reaktiv bleiben!
  3. Um die Serverlast in Grenzen zu halten soll dieser erst ab dem 6. eingegebenen Zeichen befragt werden
  4. Um die Serverlast in Grenzen zu halten, soll der Server erst befragt werden, wenn der Benutzer eine Tipppause (von 500ms) macht.

Nun ja, das sind klassische Anforderungen. Und nun wollen wir sehen, wie dies mittels RX gelöst werden kann.

Die Projektreferenzen

Es sind nach der Installation folgende neue Assemblies verfügbar, welche referenziert werden

  • System.CoreEx
  • System.Reactive
  • System.Interactive

Simulation der Server-Methode

Da es sich um ein Demo-Sample handelt wird hier die Servermethode mit einer klassichem Thread-Sleep-Methode simuliert:

image

RX im Einsatz

Punkt 1: Auf das Text-Changed Event der Textbox registrieren:

var input = (from evt in Observable.FromEvent<TextChangedEventArgs>(this.txtEnter, "TextChanged") select ((TextBox)evt.Sender).Text);

RX ist auf Observable und Observer bzw. auf den resultierenden Interfaces IObservable<T> und IObserver<T> aufgesetzt. Mittels den Factory-Klassen können entsprechende Instanzen generiert werden. Dabei stehen verschiedenste Möglichkeiten zur Registrierung zur Verfügung wie hier z.B.: FromEvent. Hier wird einfach das Objekt, sowie das Event (als String) angegeben. Mittels reflection wird sich auf das Event registriert.

Das Objekt input ist nun ein IObserveable<string>.

Mittels LINQ bzw. Lambda Expressions können nun diverseste Einschränkungen durchgeführt werden wie z.B.: die Anforderung (3):

image

Auf die Anforderung (4) lässt sich einfachst umsetzen:
image

Nun denn. Man Abonniere jetzt konkret dieses Observable-Objekt mittels Subscribe(Action). Die tatsächliche Implementierung dieser Action, welche aufgerufen wird, wenn das Event auftritt, kann natürlich anonym durchgeführt werden:

image

Die Methode  GetResult (also die Server-Simulation) legt den Thrad für 1sec schlafen. Das heißt, damit die Benutzeroberfläche reaktiv bleibt, ist ein Hintergrundthread notwendig.

image

Eine kleine Unschönheit, mit der man sich noch herumschlagen muss ist, dass mittels Thorttle ein eigener Thread geöffnet wird. Damit also in der letzten Subscriber-Methode, welche die Action aufruft, sobald Observable.Start() das Ergebnis liefert, nicht auf die Oberflächenelemente zugegriffen werden kann. Also ist hier noch ein extra Dispatch nötig:

image

Fazit

Nun ja, einerseits sieht dies schon etwas komplex aus. Andererseits wie sieht die Alternative aus :-).

Skip und Throttle ist einfach, nur leider mit ein paar Konsequenzen. Observeable.Start() mit anschließendem Subscribe zum Abholen der Ergebnisse ist ebenfalls einfach. Der Dispatcher ist klassisch.

Dies ist ein Anwendungsbeispiel. Mal schaun was es dort noch so gibt…

 

Hier nochmals der der gesamte Programmcode:

private void Window_Loaded(object sender, RoutedEventArgs e) {
    //Subscribe on the TextChanged - Event
    var input = (from evt in Observable.FromEvent<TextChangedEventArgs>(
                        this.txtEnter, "TextChanged")
                    select ((TextBox)evt.Sender).Text);

    var subscription2 = input
        .Skip(5)   //Skip until there are at least 6 characters entered
        .Throttle(TimeSpan.FromMilliseconds(500))  //Wait until a pause of the user...
        .Subscribe(n => {

            Observable.Start(() => {
                return GetResults(n);  //Call this method asynchron!
            }).Subscribe((result) => { //subscribe to the result as soon as it is back

                //It's still an other thread because of Throttle :(
                this.lstBx.Dispatcher.BeginInvoke((ThreadStart)delegate() {
                    //Show the result in the listbox
                    lstBx.Items.Clear();
                    foreach (var item in (IEnumerable<string>)result)
                        lstBx.Items.Add(item);
                });
            });
        });

}

static int cnt = 0;
private static IEnumerable<string> GetResults(string searchStuff) {
    cnt++;
    System.Threading.Thread.Sleep(1000);
    return new String[] { "Some" + cnt, "Results" + cnt,
        "From" + cnt, "The" + cnt, "Server" + cnt };
}
Categories: C# | RX

Add comment




  Country flag

biuquote
  • Comment
  • Preview
Loading