Reactive Programming – Async Can Be Easy – What is Observable?



Reactive Programming – Async Can Be Easy – What is Observable?

1 1


rx-devconf-presentation


On Github Jiri-Kremser / rx-devconf-presentation

Reactive Programming

Async Can Be Easy

Jiří Kremser < jkremser(at)redhat.com >

#hawkular on Freenode

2016-01-27

A-sync?

Async!

Multiple events and computations happening simultaneously Not necessarily multi-threading - JavaScript event loop.

Asynchronous Programming is Annoying

  • Each language/framework has its own way of expressing async/event-based programming
    • callbacks, Promises/Futures, listeners, low level events, ...
    • Each concept covers only part of the story

Asynchronous Programming is Annoying

  • Callback
    public void getData(Callback<T> callMeMaybe);
  • Future/Promise
    public Future<T> getData();
  • WT*?
    public Future<List<Future<T>>> getData();
  • Observable
    public Observable<T> getData();
  • Each technique needs to justify itself, oftenly by attacking the status quo.
  • So why am i talking about it? How reactive programming can help here?

Where to find it?

..in Rx libraries

“Telling a programmer there's already a library to do ${x} is like telling a songwriter there's already a song about love.” --internets 20 sec - auto switch

Uber Standard? Yeah sure..

20 sec - auto switch

FRP

  • Functional Reactive Programming
  • Language agnostic model with 3 concepts:
  • Observer, Observable
  • Combinators (filter, map, reduce, etc.)
  • How, Where, When (Schedulers)
  • Why functional?
Combinators ~ Operators

Because..

Why functional?

  • Future, Observable, Collection, Optional are monads
  • Pure functions on Observables, cannot be mutated
  • Explicitly dealing with side-effects
    () => Future[Try[Option[T]]]
  • We can call flatMap on them ~ bind operator
  • Future and Observable are monads that deals with latency

Applied Duality

Single value Mutiple values Pull/Synchronous/Interactive Object Iterable Push/Asynchronous/Reactive Future/Promise/Task Observable
  • In Pull model: Data consumer synchronously pulls from producer
  • In Push model: Data producer asynchronoulsy pushes to consumers
  • Future represents a future value and it'll materialize itself eventually, or fail.

What is Observable?

  • represents a push based collection
  • can be combined with other observables
  • monad
  • any number of values over any amount of time
Iterable Observable pull push
T next()
onNext(T)
throws new Exception();
onError(Exception)
return;
onCompleted()
(onNext)*(onError|onCompleted)?
happy paths: ─ onNext ─ onNext ─ onNext ─ onNext ─ onNext ─ … ⟶ ─ onNext ─ onNext ─ onNext ─ onCompleted ⟼ less happy path: ─ onNext ─ onNext ─ onNext ─ onNext ─ onError ⟼

Memory Effectiveness

Stream.of(3, 1, 4, 1, 5, 9, 2)
      .flatMap(x -> Stream.of(x, x))
      .map(x -> x * 2)
      .filter(x -> x > new Random().nextInt(9));
vs
Observable.just(3, 1, 4, 1, 5, 9, 2)
          .flatMap(x -> Observable.just(x, x))
          .map(x -> x * 2)
          .filter(x -> x > new Random().nextInt(9));
  • intermediate results => less GC
  • 1 by 1 processing, data is only iterated over once
  • hot potatoe
  • lazy approach, no observer - no processing

Factories

  • .just()
  • .create()
  • .from()
  • .range()
  • .repeat()
  • .interval()
  • .timer()
  • .empty() / .never() / .throw()
  • ...

Combinators for Observable[T]

  • .map(f: T -> R): Observable[R]
  • .flatMap(f: T -> Observable[R]): Observable[R]
  • .filter()
  • .zip(), .merge(), .concat(), groupBy(), sample(), etc.
  • map ~ projection
  • flatMap is more powerful - it can change the number of elements/events
  • filter ~ selection
  • join ~ zip/merge/concat
  • relational algebra

Hot vs Cold

  • Cold ones start producing notifications after subscription (default).
  • Hot ones don't care about subscribers.

Demo 1

Talk is cheap ;)

Rx is everywhere (JVM)

  • Java
    Observable.interval(1, TimeUnit.SECONDS)
              .subscribe(System.out::println);
  • Scala
    Observable.interval(1, TimeUnit.SECONDS)
              .subscribe(println(_))
  • Groovy
    Observable.interval(1, TimeUnit.SECONDS)
              .subscribe{println it}
  • JRuby
    Observable.interval(1, TimeUnit.SECONDS)
              .subscribe {|val| puts val}
  • Clojure
    (->
      (Observable/interval 1 TimeUnit/SECONDS)
      (.subscribe (fn [arg] (println arg))))

Rx is everywhere

  • .NET
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Subscribe(x => Console.WriteLine(x));
  • C++
    rx::observable<>::interval(std::chrono::seconds(1))
       .subscribe(
         [](long v){std::cout << v;},
         [](){std::cout << "OnCompleted";}
       )
  • Python
    Observable.interval(1000)
              .subscribe(lambda x:  print x)
  • PHP
    \Rx\Observable::interval(1000, $scheduler)
                  ->subscribe(function ($x) { echo $x; });
  • Dart, Haskell, Android SDK, Objective-C & Swift, etc.

Rx is everywhere

  • Angular 2.x
  • ES7
  • reactive-streams.org
  • ${your_project}
  • interop between different reactive system

Websockets

const wsUri = 'wss://echo.websocket.org';

Rx.Observable
  .webSocket(wsUri)
  .filter((e) => e.topic === 'kittens')
  .subscribe((e) => console.log(e));

Websockets

const wsUri = 'wss://echo.websocket.org';

			Rx.Observable
			.webSocket(wsUri)
			.groupBy((e) => e.topic)
			.subscribe((e) => console.log(e));

Demo 2

  • Now, that you know that everything is a stream.. We can do the demo.

See the Pen xZaGWy by Jiri Kremser (@jkremser) on CodePen.

Testability

it('should filter in only prime numbers', function () {
	var source = hot('-1--2--^-3-4-5-6--7-8--9--|');
	var subs =              '^                  !';
	var expected =          '--3---5----7-------|';

	expectObservable(source.filter(isPrime)).toBe(expected);
	expectSubscriptions(source.subscriptions).toBe(subs);
});
(link) More of it here.
  • schedulers
  • overloaded combinators

Maturity

  • it is very solid

Recap

  • Observable ~ any number of values over any amount of time.
  • Rx make sense for UI as well as for the server-side.
  • Can be done in any language.
  • Cold Observables are lazy.

Want to Learn More?

  • given by Eric meyer who is also co-author of the reactivemanifesto.

Make your APIs observable

This presentation

https://goo.gl/TgFwfq

Jiří Kremser