Fino ad oggi siamo stati abituati a far interagire le nostre applicazioni con l’ambiente in cui queste vivono in maniera “interattiva”, nel senso che è il nostro programma che “interroga” lo stato dell’ambiente e agisce di conseguenza. Per fare un esempio pensiamo all’interfaccia IEnumerable, essa ci restituisce un IEnumerator che ci permette attraverso la chiamata al metodo MoveNext() di ottenere un elemento(nella proprietà Current); in tal senso siamo “interattivi” ovvero siamo noi che chiediamo il prossimo elemento all’enumeratore. Un’altro modo di approcciare è quello di essere “reattivi” ovvero metterci nelle mani dell’ “ambiente”(l’ambiente nel nostro caso potrebbe essere rappresentato da una semplice classe) e lasciare a lui il compito di “attivarci”. Questo modo di lavorare può così essere definito “basato su eventi” e “asincrono”(by design, è l’ambiente che ci attiva e non sappiamo quando). Dalla versione 4.0 del .NET Framework abbiamo a disposizione 2 nuove interfaccie nate proprio allo scopo di permetterci di creare ambienti che siano “reattivi”(per versioni più vecchie di framework le possiamo scaricare insieme all Rx Framework di cui parliamo tra un pò). Le interfaccie in questione sono System.IObservable<> e System.IObserver<>. Vediamole in dettaglio:
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
Se le guardiamo con attenzione ci accorgiamo subito che queste due interfacce sono matematicamente il duale di IEnumerable/IEnumerator e ci permettono di avere un funzionamento di tipo “push” invece che il classico “pull” dell’IEnumerable/IEnumerator. Quando dico duale intendo il fatto che:
- per quanto riguarda IObservable/IEnumerable al posto del metodo GetEnumerator() che ritorna IEnumerable abbiamo il metodo Subscrible che accetta un parametro di tipo IObserver
- per quanto riguarda IObserver/IEnumerator non abbiamo più il metodo MoveNext() che ci permette di spostarci sul “prossimo” elemento(che verrà messo in Current), ma abbiamo il metodo OnNext(void) che ci “passa”(quando l’”ambiente” vuole) l’elemento corrente
-nell' IEnumerator il metodo MoveNext() comunica con un booleano se ci sono ancora elementi invece nel caso dell’IObserver abbiamo il metodo void OnCompleted()
-nel caso di eccezioni nell’ IEnumerator queste vengono sollevate alla chiamata del MoveNext() mentre nell IObserver abbiamo un metodo OnError che ci fornisce il dettaglio dell’errore
Detto questo; i campi di applicazione di questa metodologia basata su eventi asincroni sono innumerevoli e vanno dalla scrittura di “reazioni”(per rendere l’idea) ad eventi di una applicazione WPF, all’utilizzo nel mondo “asicrono” per natura del web e in qualsiasi parte del nostro codice dove non siamo noi che “possiamo” decidere quando “andare avanti”, ma dove è l’ “ambiente” che ce lo comunica.
Parallelamente a questo in casa Microsoft è nato un progetto dai DevLabs chiamato “Reactive Extensions for .NET (Rx)”, con lo scopo di creare degli extension method che permettano(come per IEnumerable) di effettuare delle LINQ query su oggetti di tipo IObservable come possono ad esempio diventare normali eventi .NET o librerie che utilizzano il pattern APM(Asynchronous Programming Design Patterns), ma non solo. In questa libreria abbiamo molti extension method che ci permettono di effettuare “composizioni” anche molto complesse di oggetti IObservable senza doverci preoccupare di problematiche come sincronizzazione, multithreading, coordinamento tra i vari “input streams”(alla fine se devo unire tra loro più IObservable in qualche modo devo sincronizzare i vari OnNext()).
Un’ esempio dell’applicazione di questo pattern potrebbe essere lo sviluppo di una classe che permetta di mettersi in ascolto di messaggi che arrivano in modo “asincrono” da un WebService WCF con un contratto di CallBack, ad esempio con il seguente contratto:
namespace WCFService
{
[ServiceContract(CallbackContract=typeof(IEvent))]
public interface IEventService
{
[OperationContract]
void Subscribe();
[OperationContract]
void Unsubscribe();
}
[ServiceContract]
public interface IEvent
{
[OperationContract]
void EventFired(XElement data);
}
}
Subscribe/Unsubscribe servono per abbonare/disabbonare il client e il contratto di callback ha come argomento un XElement di questa forma:
<Order id="10" customerId="1" xmlns="urn:GestioneOrdini" />
l’xml porta un’ipotetico OrderId e CustomerId.
La natura “event based” del funzionamento del servizio si presta bene a costrure un oggetto di tipo IObservable che permetta lato client di mettersi in ascolto per determinati tipi di ordine, magari solo quelli fatti da un determinato customer(esempio customerId == 1). Lato client il codice “standard” WCF per poter gestire le callback è la costruzione di un oggetto che implementa l’interfaccia di callback per passarne poi un’instanza in fase di creazione al proxy che effettua le chiamate, nel nostro caso:
class CallbackClass : WCFClientProxy.IEventServiceCallback
{
public delegate void ServiceFire(object sender, ServiceEvent data);
public event ServiceFire EventFireEvent;
public void EventFired(XElement data)
{
if (this.EventFireEvent != null)
this.EventFireEvent(this, new ServiceEvent { Data = data });
}
}
nella mia implementazione ho deciso di far sollevare un’evento al momento dell’arrivo di un messaggio dal servizio, l’evento in questione ha un EventArgs del tipo:
public class ServiceEvent : EventArgs
{
public XElement Data { get; set; }
}
la proprietà “Data” porta chiaramente il payload Xml ricevuto. Grazie all Rx framework e alle nostre 2 nuove interfaccie possiamo permettere a chi utilizza il servizio di scrivere un codice del tipo:
class Program
{
static void Main(string[] args)
{
WCFServiceListener listener = new WCFServiceListener();
var xs = from ev in listener
where ((int)ev.EventArgs.Data.Attribute("customerId")) == 1
select ev;
IDisposable subscription =
xs.Subscribe(e => Console.WriteLine(e.EventArgs.Data));
Console.ReadKey();
subscription.Dispose();
Console.ReadKey();
}
}
WCFServiceListener è una classe che implementa IObservable<T> che al suo interno racchiude la logica di Subscribe/Unsubscribe; la parte più interessante è l’implementazione del metodo Subscribe dell’interfaccia:
public IDisposable Subscribe(IObserver<IEvent<ServiceEvent>> observer)
{
if (TraceDispose._subscriblerCount == 0)
_proxy.Subscribe();
return
new TraceDispose(Observable.FromEvent<ServiceEvent>(
_callBackClass,
"EventFireEvent")
.Subscribe(observer), _proxy);
}
attraverso un metodo dell’ Rx framework(metodo statico FromEvent della classe Observable) creo un IObservable a partire da un’evento(nel nostro caso EventFireEvent della classe CallbackClass) e da un’instanza dell’oggetto(_callBackClass passata al proxy in fase di creazione) che può sollevare un evento con quel nome. La classe TraceDispose serve solo per la MIA implementazione interna a tenere il conto di quanti si sono “abbonati” al servizio per chiamare il metodo _proxy.Unsubscribe() in caso TraceDispose._subscriblerCount sia uguale a 0.
Tornando al codice del nostro main possiamo vedere come diventa estremamente chiaro e facile capire cosa stiamo facendo:
1)creo il mio IObservable
WCFServiceListener listener = new WCFServiceListener();
2)attraverso l’extension method “Where” dell’Rx filtro i messaggi che arrivano ricevendo solo quelli in cui il customerId è 1.
var xs = from ev in listener
where ((int)ev.EventArgs.Data.Attribute("customerId")) == 1
select ev;
3)
IDisposable subscription =
xs.Subscribe(e => Console.WriteLine(e.EventArgs.Data));
a questo punto mi “abbono” all’IObservable(questa versione di Subscribe fa parte sempre dell’ Rx framework, lo possiamo notare in quanto la firma dell’interfaccia non porta un metodo che accetta delle lambda, è l’overload che crea per noi un IObserver che in caso di chiamata OnNex() chiama il nostro delegate) e passo come delegate in caso di OnNext(quando arriva il messaggio dal server) la stampa in console del messaggio Xml.
Da questo punto in poi la mia applicazione è in ascolto in modo asincrono del servizio e appena arrivano gli ordini lo vediamo stampato in console, ad esempio:
possiamo notare il fatto che customrId è sempre 1 grazie alla nostra query LINQ sull’IObservable. Per disabbonarsi dobbiamo semplicemente chiamare Dispose sull’oggetto che ci viene ritornato in fase di Subscribe(Subscribe ritorna un oggetto IDisposable come da interfaccia) :
subscription.Dispose();
Naturale vero?A me piace un sacco, finalmente una sintassi “intuitiva” per tutte quelle applicazioni che “reagiscono” all’ambiente, supportato da un sacco di possibilità di “composition”(Rx framework ed extension method vari) e senza dover gestire problematiche come concorrenza e sincronizzazione.
Se volete approfondire questa tecnologia vi consiglio di farvi un giro sul portale ufficiale
Reactive Extensions for .NET (Rx)
Rx è utilizzabile con il .NET Framework 3.5 SP1,.NET Framework 4.0 e Silverlight 3
Se volete scaricare la demo del post
attenzione la demo non deve essere usata in linea…non è stata testata e potrebbe presentare problemi di sincronizzazione e di gestione delle eccezioni…ah serve VS2010(Beta2)