Benutzer:MovGP0/NET/Rx

aus Wikipedia, der freien Enzyklopädie
Zur Navigation springen Zur Suche springen
   MovGP0        Über mich        Hilfen        Artikel        Weblinks        Literatur        Zitate        Notizen        Programmierung        MSCert        Physik      


Reactive Extensions

[Bearbeiten | Quelltext bearbeiten]
LINQ vs. Rx[1]
Synchronous Asynchronous
Single value
// Synchronous call
int x = f(42);

// Synchronous call
int y = g(x);
// Asynchronous call
int x = await fAsync(42);

// Asynchronous call
int y = await gAsync(x);
Multi value
// Shopping database
var res = from o on orders
          from p in p.products
          where p.Price > 29.95m
          select p.Name

// Synchronous MoveNext
res.ToList().ForEach(p => Console.WriteLine(p));
// Stock trade events
var res = from t in trades
          from q in t.Symbols
          where q.QUote > 29.95m
          select q.Symbol

// Asynchronous OnNext
res.Subscribe(p => Console.WriteLine(p));
IObservable<T> vs. ISubject<T> vs. IObserver<T>
Wichtige Interfaces
Interface Bedeutung
IObservable<T>
  • IDisposable Subscribe(IObserver<T>)
  • IDisposable Subscribe(Action<T>)-Extension method

siehe auch: Disposable.Empty

IObserver<T> wird der Observablen übergeben
  • OnNext(T)
  • OnError(Exception)
  • OnCompleted()
ISubject<T> an Observer<T> and Observable<T> at the same time.
Subject<T> Observer bekommt alle Daten ab dem Zeitpuntk der Subscription
ReplaySubject<T>
  • Cached alle Daten und übergibt sie dem Observer zum Zeitpunkt der Registrierung
  • Cache kann über maximale Anzahl der zwischengespeicherten Elemente bzw. Cachedauer gesteuert werden
BehaviourSubject<T> Wie ReplySubject, hat zum Zeitpunkt der Subscription jedoch garantiert einen Wert
AsyncSubject<T> Immer genau ein Wert. Observer wird einmalig aufgerufen sobald ein Wert übergeben wird.

Factory Methods

[Bearbeiten | Quelltext bearbeiten]
Factory Methods to create an IObservable<T>
Observable.Return<T>(T) takes a value and returns only that value once.
Observable.Empty<T>() completes without return anything.
Observable.Never<T>() infinite sequence without notifications
Observable.Throw<T>(Exception) throws an exception without returning any value
Observable.Create<T>(observer => { ... }) Factoty method to create an observable

Observable.Create

[Bearbeiten | Quelltext bearbeiten]
Create Blocking Observable Create Non-Blocking Observable
private static IObservable<string> NonBlocking()
{
   var subject = new ReplaySubject<string>();
   subject.OnNext("a");
   subject.OnNext("b");
   subject.OnCompleted();
   Thread.Sleep(1000);
   return subject;
}
private static IObservable<string> NonBlocking()
{
   return Observable.Create<string>(subject =>{
      subject.OnNext("a");
      subject.OnNext("b");
      subject.OnCompleted();
      Thread.Sleep(1000);
      return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed."));
   });
}
Observable.Empty Observable.Return
public static IObservable<T> Empty<T>()
{
   return Observable.Create<T>(observer => {
      observer.OnCompleted();
      return Disposable.Empty;
   });
}
public static IObservable<T> Return<T>(T value)
{
   return Observable.Create<T>(observer => {
      observer.OnNext(value);
      observer.OnCompleted();
      return Disposable.Empty;
   });
}
Observable.Never Observable.Throws
public static IObservable<T> Never<T>()
{
   return Observable.Create<T>(observer => {
      return Disposable.Empty;
   });
}
public static IObservable<T> Throws<T>(Exception exception)
{
   return Observable.Create<T>(observer => {
      observer.OnError(exception);
      return Disposable.Empty;
   });
}
Observable.Intervall Observable.Timer
public static IObservable<long> Intervall(TimeSpan period)
{
   return Observable.Generate(0L, 
      i => true, 
      i => i + 1, 
      i => i, 
      i => i == 0 ? TimeSpan.Zero : period
   );
}
public static IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)
{
   return Observable.Generate(0L, 
      i => true, 
      i => i + 1, 
      i => i, 
      i => i == 0 ? dueTime : period
   );
}
Timer Unfoding
var observable = Observable.Create<string>(observer => {
    ElapsedEventHandler handler = 
      (sender, args) => observer.OnNext(args.SignalTime.ToString());

    var timer = new Timer();
    timer.Interval = 1000;
    timer.Elapsed += handler;
    timer.Start();
    return Disposable.Create(() => { 
        timer.Elapsed -= handler;
        timer.Dispose();
        Console.WriteLine("Timer stopped.");
    });
});

using(var subscription = observable.Subscribe(Console.WriteLine))
{
    Thread.Sleep(5000);
}
void Main()
{
   var integers = Unfold(1, i => i + 1);
   foreach(var value in integers.Take(20))
   {
      Console.WriteLine(value);
   }
}

private static IEnumerable<T> Unfold<T>(T seed, Func<T, T> accumulator)
{
   var nextValue = seed;
   while(true)
   {
      yield return nextValue;
	  nextValue = accumulator(nextValue);
   }
}

Anmerkung: Das .NET Framework stellt mehrere Timer zur Auswahl:

  • System.Timers.Timer
  • System.Threading.Timer
  • System.Windows.Threading.DispatcherTimer
  • (System.Diagnostics.Stopwatch)
Observable.Range Observable.Generate
void Main()
{
   Observable.Range(10,10).Subscribe(Console.WriteLine);
}
void Main()
{
   Range(10,10).Subscribe(Console.WriteLine);
}

public static IObservable<int> Range(int initial, int count)
{
   return Observable.Generate<int,int>(initial, i => (i <= initial + count), i => i + 1, i => i);
}
Observable.Interval Observable.Timer
Intervall every 250 milliseconds
var observable = Observable.Interval(TimeSpan.FromMilliseconds(250));
using(var subscription = observable.Subscribe(Console.WriteLine))
{
   // 0..18
   Thread.Sleep(5000);  
}
Event after 1 second
var observable = Observable.Timer(TimeSpan.FromSeconds(1));
using(var subscription = observable.Subscribe(Console.WriteLine, () => Console.WriteLine("fin")))
{
   // 0, fin
   Thread.Sleep(5000);  
}
Interval starting after 0 seconds
var observable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
using(var subscription = observable.Subscribe(Console.WriteLine))
{
   // 0..19
   Thread.Sleep(5000);  
}

Paradigm Transition

[Bearbeiten | Quelltext bearbeiten]

From Action/Func

[Bearbeiten | Quelltext bearbeiten]
Observable.Start
void StartAction()
{
   var start = Observable.Start(() => {
      Console.WriteLine("working...");
      Enumerable.Range(0, 100).ToList().ForEach(i => {
         Thread.Sleep(100);
         Console.Write(".");
      });
   });

   start.Subscribe(
      unit => Console.WriteLine("\nUnit published."), 
      () => Console.WriteLine("Action completed."));
}
void StartFunc()
{
   var start = Observable.Start(() => {
      Console.WriteLine("working...");
      Enumerable.Range(0, 100).ToList().ForEach(i => {
         Thread.Sleep(100);
         Console.Write(".");
      });
      return "Value";
   });

   start.Subscribe(
      value => Console.WriteLine(string.format("\n{0} published.", value)), 
      () => Console.WriteLine("Action completed."));
}
// EventHandler delegate
var appActivated = Observable.FromEventPattern(
   handler => Application.Current.Activated += handler, 
   handler => Application.Current.Activated -= handler);

// Subclass of EventHandler
var propChanged = Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
   handler => handler.Invoke, 
   handler => this.PropertyChanged += handler, 
   handler => this.PropertyChanged -= handler);

// EventHandler<TEventArgs>
var firstChangeException = Observable.FromEventPattern<firstChangeException>(
   handler => AppDomain.CurrentDomain.FirstChanceException += handler,
   handler => AppDomain.CurrentDomain.FirstChanceException -= handler);
var task = Task.Factory.StartNew(() => "Test");
var source = task.ToObservable();
source.Subscribe(Console.WriteLine, () => Console.WriteLine("completed"));

From IEnumerable<T>

[Bearbeiten | Quelltext bearbeiten]

Note: use StopWatch to test impact, when converting from synchronous IEnumerable to asynchronous IObservable!

// do not use; example code only
public static IObservable<T> ToObservable<T>(IEnumerable<T> source)
{
   return Observable.Create<T>(observer => {
      foreach(var item in source)
      {
         observer.OnNext(item);
      }
      return Disposable.Empty;
   });
}

From APM (Asynchronous Programming Model)

[Bearbeiten | Quelltext bearbeiten]

Note: .NET 3.5/.NET 4.0 only. Was replaced in .NET 4.5/Rx 2.0 by the async/await pattern. See Rxx for examples of current version.

Web Request Stream
public class WebRequest
{
   public WebResponse GetResponse() { ... }
   public IAsyncResult BeginGetResponse(AsyncCallback callbac, object state) { ... }
   public WebResponse EndGetResponse(IAsyncResult asyncResult) { ... }
}
public class Stream
{
   public int Read(byte[] buffer, int offset, int count) { ... }
   public IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { ... }
   public int EndRead(IAsyncResult asyncResult) { ... }
}
// BeginRead takes <byte[], int, int, ...> 
// EndRead returns <..., int>
var observableStream = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);

Sequence Reduction

[Bearbeiten | Quelltext bearbeiten]
ABC's of Functional Programming
Name alternative Names
Anamrophism Ana, Unfold, Generate
Bind Map, SelectMany, Projection, Transform
Catamorphism Cata, Fold, Reduce, Accumulate, Inject
Rx Extension Methods
Group Creation Reduction Inspection Aggregation Transformation
Operators  
  • Where
  • Distinct
  • DistinctUntilChanged
  • IgnoreElements
  • Skip
    • SkipWhile
    • SkipLast
    • SkipUntil
  • Take
    • TakeWhile
    • TakeLast
    • TakeUntil
  • Any
  • All
  • Contains
  • DefaultIfEmpty
  • ElementAt
  • SequenceEqual
  • Count
  • Min
  • Max
  • Sum
  • Average
  • First
  • Last
  • Single
Custom
  • Aggregate
  • Scan
Partitioning
  • MinBy
  • MaxBy
  • GroupBy
  • Select
  • Cast
  • OfType
  • Timestamp
  • TimeInterval
  • Materialize
  • Dematerialize
  • SelectMany
  • Catch
  • Finally
  • Using
  • OnErrorResumeNext
  • Retry

Sequence Combination

[Bearbeiten | Quelltext bearbeiten]
Sequential Concurrent Pairing
  • Concat
  • Repeat
  • StartWith
  • Amb
  • Merge
  • Switch
  • CombineLatest
  • Zip
  • And-Then-When
  • UI Application
    • handler blocks for < 50ms
      • use TaskPoolScheduler when available
      • use ThreadScheduler when TaskPoolScheduler is not available
    • handler blocks for > 50ms (ie. I/O)
      • use NewThreadScheduler
  • Service Application
    • Data from a Queue
      • EventLoopScheduler prevents order of events
    • handler blocks for > 50ms (ie. I/O)
      • use NewThreadScheduler
    • handler blocks for < 50ms
      • use TaskPoolScheduler if available
      • use ThreadScheduler when TaskPoolScheduler is not available

Cost:

  • Creating a Thread Pool takes 500ms
  • Creating a Thread takes 50ms and 2MB of RAM
Important Schedulers
Scheduler Usage
ImmediateScheduler Scheduler.Immediate
CurrentThreadScheduler Scheduler.CurrentThread
DispatcherScheduler DispatcherScheduler.Current
Scheduler.ThreadPool
EventLoopScheduler
NewThreadScheduler NewThreadScheduler.Default
Scheduler.NewThread
ThreadPoolScheduler ThreadPoolScheduler.Default
TaskPoolScheduler TaskPoolScheduler.Default
new WindowsFormsSynchronizationContext() WinForms UI Scheduler
new SynchronizationContextScheduler(SynchronizationContext.Current) WPF UI Scheduler
Handle ArgumentExceptions on EventSubscriptions

System.ArgumentException: Cannot bind to the target method because its signature or security transparency is not compatible with that of the delegate type.

Instead of:

return Observable.FromEvent<StrokeCollectionChangedEventHandler, StrokeCollectionChangedEventArgs>(
        eh => strokeCollection.StrokesChanged += eh,
        eh => strokeCollection.StrokesChanged -= eh);

try:

return Observable.FromEvent<StrokeCollectionChangedEventHandler, StrokeCollectionChangedEventArgs>(
        handler =>
        {
            StrokeCollectionChangedEventHandler eh = (sender, args) => handler(args);
            return eh;
        },
        eh => strokeCollection.StrokesChanged += eh,
        eh => strokeCollection.StrokesChanged -= eh);

Internetquellen

[Bearbeiten | Quelltext bearbeiten]

Rx on the server

[Bearbeiten | Quelltext bearbeiten]
TPL Dataflow
  1. Reactive Extensions v2.0 Beta available now! In: Reactive Extensions Team Blog. Microsoft, 12. März 2012, abgerufen am 26. Mai 2014.