Building Reactive Apps – Users Want – Reactive Web



Building Reactive Apps – Users Want – Reactive Web

0 1


building-reactive-apps


On Github jamesward / building-reactive-apps

Building Reactive Apps

James Ward ~ @_JamesWard

Users Want

  • In-Sync Data
  • Real-time Collaboration
  • Instant Feedback
  • To Not Wait

Users Want Reactive Apps

www.ReactiveManifesto.org

Going Reactive

  • Reactive Web
  • Reactive Actors
  • Reactive Streams

Reactive Web

Async + Non-Blocking

  • Reactive Requests
  • Reactive Composition
  • Reactive Push
  • 2-Way Reactive

Reactive Requests

Jump to: Java Code | Java 8 Code | Scala Code

Scala Code Examples

Blocking Request

def foo = Action {
  Ok("foo")
}

Async Request

def foo = Action.async {
    Future.successful(Ok("foo"))
}

Reactive Request (Async + Non-Blocking)

def pause(duration: Int) = Action.async {
  Promise.timeout(Ok(duration.toString), duration seconds)
}

Reactive WS Client

val f: Future[Response] = WS.url("http://www.foo.com").get

Reactive Composition

def foo = Action.async {
  val futureResponse = WS.url("http://www.foo.com").get
  futureResponse.map { response =>
    Ok(response.body)
  }
}

Reactive Composition

def foo = Action.async {
  val futureJW = WS.url("http://www.jamesward.com").get
  val futureTwitter = WS.url("http://www.twitter.com").get
  for {
    jw <- futureJW
    twitter <- futureTwitter
  } yield Ok(jw.response.body + twitter.response.body)
}

Reactive Push with SSE

Server-side:

def events = Action {
  val (enumerator, channel) = Concurrent.broadcast[String]

  Promise.timeout(channel.push("hello"), 1.second)

  Ok.feed(enumerator &> EventSource()).as(EVENT_STREAM)
}

Client-side:

$ ->
  events = new EventSource("/events")
  events.onmessage = (e) ->
    console.log(e.data)

2-Way Reactive with WebSockets

Server-side:

def echoWs = WebSocket.using[String] { request =>
  val (enumerator, channel) = Concurrent.broadcast[String]
  val in = Iteratee.foreach[String](channel.push)
  (in, enumerator)
}

Client-side:

$ ->
  ws = new WebSocket("ws://localhost:9000/echo")
  ws.onopen = () ->
    ws.send("foo")
  ws.onmessage = (message) ->
    console.log(message.data)

Resilient Futures

future.recover {
  case Exception =>
    Logger.error("Failed!")
    InternalServerError("Boum!")
}

Java Code Examples

Blocking Request

public static Result index() {
    return ok(views.html.index.render("hello"));
}

Async Request

public static F.Promise<Result> index() {
    return F.Promise.promise(new F.Function0<Result>() {
        public Result apply() {
            return ok(views.html.index.render("hello"));
        }
    });
}

Reactive Request (Async + Non-Blocking)

public static F.Promise<Result> index() {
    return F.Promise.delayed(new F.Function0<Result>() {
        public Result apply() throws Throwable {
            return ok(views.html.index.render("hello"));
        }
    }, 5, TimeUnit.SECONDS);
}

Reactive Requests

public static F.Promise<Result> index() {
    F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
    return jw.map(new F.Function<WS.Response, Result>() {
        public Result apply(WS.Response response) throws Throwable {
            return ok(response.getBody());
        }
    });
}

Reactive Composition

public static F.Promise<Result> index() {
    final F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
    final F.Promise<WS.Response> tw = WS.url("http://www.twitter.com").get();
    return jw.flatMap(new F.Function<WS.Response, F.Promise<Result>>() {
        public F.Promise<Result> apply(final WS.Response jwR) throws Throwable {
            return tw.map(new F.Function<WS.Response, Result>() {
                public Result apply(WS.Response twR) throws Throwable {
                    return ok(twR.getBody() + jwR.getBody());
                }
            });
        }
    });
}

Reactive Push with SSE

public static Result events() {
    EventSource eventSource = new EventSource() {
        public void onConnected() {
            sendData("hello");
        }
    };
    return ok(eventSource);
}

$ ->
  events = new EventSource("/events")
  events.onmessage = (e) ->
    console.log(e.data)

2-Way Reactive with WebSockets

public static WebSocket<String> echo() {
    return new WebSocket<String>() {
        public void onReady(final In<String> in, final Out<String> out) {
            in.onMessage(new F.Callback<String>() {
                public void invoke(String message) throws Throwable {
                    out.write(message);
                }
            });
        }
    };
}

$ ->
  ws = new WebSocket("ws://localhost:9000/echo")
  ws.onopen = () ->
    ws.send("foo")
  ws.onmessage = (message) ->
    console.log(message.data)

Java 8 Code Examples

Blocking Request

public static Result index() {
    return ok("foo");
}

Async Request

public static F.Promise<Result> foo() {
    return F.Promise.promise(() -> ok("foo"));
}

Reactive Request (Async + Non-Blocking)

public static F.Promise<Result> foo() {
    return F.Promise.delayed(() -> ok("foo"), 5, TimeUnit.SECONDS);
}

Reactive Requests

public static F.Promise<Result> foo() {
    F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
    return jw.map(response -> ok(response.getBody()));
}

Reactive Composition

public static F.Promise<Result> foo() {
    F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
    F.Promise<WS.Response> tw = WS.url("http://www.twitter.com").get();
    return jw.flatMap(jwr ->
        tw.map(twr ->
            ok(twr.getBody() + jwr.getBody())));
}

Reactive Push with SSE

public static Result events() {
    EventSource eventSource = new EventSource() {
        public void onConnected() {
            sendData("hello");
        }
    };
    return ok(eventSource);
}

$ ->
  events = new EventSource("/events")
  events.onmessage = (e) ->
    console.log(e.data)

2-Way Reactive with WebSockets

public static WebSocket<String> echo() {
    return new WebSocket<String>() {
        public void onReady(final In<String> in, final Out<String> out) {
            in.onMessage(out::write);
        }
    };
}

$ ->
  ws = new WebSocket("ws://localhost:9000/echo")
  ws.onopen = () ->
    ws.send("foo")
  ws.onmessage = (message) ->
    console.log(message.data)

Reactive Actors

Event-Driven Workers

  • Managed Concurrency
  • Isolated Failure Handling (Supervision)
  • Scales up & out

Actors & Requests

public class FooActor extends UntypedActor {
    @Override
    public void onReceive(Object message) throws Exception {
        F.Promise<WS.Response> jw = WS.url("http://www.jamesward.com").get();
        Patterns.pipe(jw.wrapped(), getContext().dispatcher()).to(getSender());
    }
}

public static F.Promise<Result> foo() {
    ActorRef fooActor = Akka.system().actorOf(Props.create(FooActor.class));

    F.Promise<Object> p = F.Promise.wrap(Patterns.ask(fooActor, "foo", 5000));

    return p.map(r -> ok(((WS.Response) r).getBody()));
}

Reactive Streams

A JVM standard for asynchronous stream processing with non-blocking back pressure

www.reactive-streams.org

Get Started

Typesafe Activator Templates