Created by John Murray
Slides and Presentation on Github
git clone git@github.com:JohnMurray/akka-actors-futures-presentation.git cd akka-actors-futures-presentation/IntelliJProject sbt gen-idea
Futures offer us a way to perform async or defered tasks.
Future { println("Hello") } println("World")
You can use callbacks to order program execution
Future { println("Hello") }.onSuccess { case r => println("World") }
Futures are stateless (at least in that state you can define)
Future { val i = 1 } println(i) #=> Compile-time error: not found: value iWhat I mean by that is, the Future holds some state such as if it completed, succeeded, or failed, etc.
However, you can get the result in a callback (as we‘ve already seen)
Future { val i = 1 i }.onSuccess { case r => println(r) } #=> Output: 1
Possible callbacks
val future = Future { val i = 1 i } future.onSuccess { case result => println(result) } #=> output: 1
val future = Future { val i = 1 i / 0 } future.onFailure { case t : Throwable => println(t.getMessage) }
val future = Future { liveDangerously // throws exectpion 50% of time "made it!" } future.onComplete { result => result match { case Success(value) => println(value) case Failure(t) => println(t.getMessage) } }
You can also block on Futures and wait for the result.
val future : Future[Int] = Future { val i = 1; i } val result : Int = Await.result(future, 2 seconds) println(result)
Just like we saw in the Intro Course, we can also map Futures in the same way that we mapped Options.
val futureInt : Future[Int] = Future { 1 } val futureStr : Future[String] = futureInt.map(res => res.toString) futureStr.onSuccess{ case res => println(res) }
case class User(firstName: String, lastName: String) def getUserFromExternalService(id: Int): Future[User] = { ... } def printUser(id: Int) = { val userName = getUserFromExternalService(id).map { usr: User => s"${usr.firstName} ${usr.lastName}" } userName.onSuccess { case name: String => println(name) } }Note that you should avoid using
Awaitand I‘m really just trying to provide samples that are easy to understand/run.
case class Car(driverId: Int) def getCarFromexternalService(id: Int): Future[Car] = { ... }
def printDriver(carId: Int) = { val driverName = getCarFromexternalService(carId).map { car: Car => getUserFromExternalService(car.driverId).map { usr: User => s"${usr.firstName} ${user.lastName}" } } // Future[Future[String]] driverName.onSuccess { case dn => dn.onSuccess { case dnPrime => println(dnPrime) } } }
We can do better than this. What tool do we reach for when we have this nesting created by continued maping?
flatMap!
def printDriver(carId: Int) = { val driverName = getCarFromexternalService(carId).flatMap { car: Car => getUserFromExternalService(car.driverId).map { usr: User => s"${usr.firstName} ${user.lastName}" } } // Future[String] driverName.onSuccess { case dn => prinln(dn) } }
And life would not be complete without a for comprehension
So... let‘s use one.
def printDriver(carId: Int) = { val driverName = for { car <- getCarFromexternalService(carId) user <- getUserFromExternalService(car.driverId) } yield (s"${user.firstName} ${user.lastName}"}) println(Await.result(driverName, 2 seconds)) }
This is great!
We can transform / compose futures and combine them with other futures. This makes for very powerful and flexible API for concurrent programming.
But what about exceptions?
When I‘m doing all this "composing", what if an exception is thrown?
For example
def liveDangerously = { ... } // throws exception 50% of time val future = Future { liveDangerously "Hello" } future.onSuccess(println) future.onFailure(t : Throwable => println(t.getMessage))
What happens if, for example, when the future fails I wanted to provide a default value?
We can do this with recover
val dangerousFuture : Future[String] = Future { liveDangerously "Hello" } val safeFuture : Future[String] = dangerousFuture.recover { case Throwable => "Cheerio" } safeFuture.onSuccess { result => println(result) }
Your turn
def safeString(str: String, def: String) : Future[String] = { dangerousString(str).recover { case t: Throwable => def } } safeString("Hello", "Cheerio").flatMap{s : String => safeString(s + " there", s + " good") }.flatMap{s: String => safeString(s + " Frank", s + " sir") }
Uses the Scala API, but usage much more verbose since Java doesn‘t currently support lambda expressions.
Let‘s just suffer for a moment so you can see what it looks like.
Hello World Future
Future<String> f = future(new Callable<String>() { public String call() { return "Hello" + " " + "World"; } }, system.dispatcher()); f.onSuccess(new OnSuccess<String>() { public final void onSuccess(String msg) { System.out.println(msg); } });
Same print-user example as before
public void printUser(int id) { Future<String> userName = getUserFromExternalService(id).map( new Mapper<User, String>() { public String apply(User u) { return u.firstName + " " + u.lastName; } } ); System.out.println(Await.result( userName, new Duration(2).seconds, system.dispatcher() )); }
Now the print-driver example that we saw before
public void printDriver(int carId) { Future<String> driverName = getCarFromexternalService(carId).flatMap( new Mapper<Car, Future<String>>() { public Future<String> apply(Car car) { getUserFromExternalService(car.driverId).map( new Mapper<User, String>() { public String apply(User u) { return u.firstName + " " + u.lastName; } } ); } } ); System.out.println(Await.result(driverName, new Duration(2).seconds, system.dispatcher())); }
Okay, enough of that. You get the point.
The actor model in computer science is a mathematical model of concurrent computation that treats "actors" as the universal primitives of concurrent digital computation.
In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
Source: Wikipedia
The primary form of concurrency within Scala.
Built on asynchronous message passing
Implementation borrowed heavy from actor-based patterns in Erlang.
Threads present, but not preferred. Mainly for interopability / Actor implementation Erlang is built for high-availability systems - telecom - rabbitmqAn actor can be thought of as running in a seperate process as they share no state with the outside world.
Typical actor based development is akin to implementing a Service Oriented Architecture (SOA) within a single application.
Walk through a demonstration of building a web app with some actors: Authentication actor DB Service Actor - returns data Handler actor - receives incoming request etc.All actors belong to an ActorSystem. The system contains all actors and handles all communication between actors.
Draw a representation of an actor system (big circle with smaller circles [actors] inside).Actors can run in the same process, on the same machine (different processes), or on different servers (possibly in different data-centers).
Communication is done either by inter-system communication or over TCP using a custom Akka protocol.
Edit the previous drawing to show that ActorSystems on different processes communicate to each other (possibly not over a port). Edit the previous drawing to show that the ActorSystem can expose a port to handle incoming requests on (Akka Remoting).All actors have a Supervisor. The supervisor is responsible for monitoring the lifecycle of the Actor. This includes failures / errors and termination.
The Supervisor to any given actor is the Actor that created it. The default Actor in Akka is the "Guardian" actor.
Actors can be put behind a router, allowing you to do things like creating a pool of workers.
Hello World Actor
class HelloActor extends Actor { def receive = { case msg: String => println(msg) } } // create actor (code omitted) helloActor ! "Hello"
Hello World Actor
class HelloActor extends Actor { def receive = { case msg: String => println(msg) } } // create actor (code omitted) helloActor ! "Hello" println(" World")Talk about async-processing
State is self-contained
class IncActor extends Actor { var count: Int = 0 def receive = { case incBy: Int => this.count += incBy } } // create actor (code omitted) incActor ! 3 incActor ! 4 incActor ! 5 println(incActor.count) # => Compile Time Exception!Talk about how the actor is encapsulated within an Actor Ref (briefly).
State is self-contained
class IncActor extends Actor { var count: Int = 0 def receive = { case incBy: Int => this.count += incBy case "count" => println(this.count) } } // create actor (code omitted) incActor ! 3 incActor ! 4 incActor ! 5 incActor ! "count"Talk about how the actor is encapsulated within an Actor Ref (briefly).
State is self-contained
class IncActor extends Actor { var count: Int = 0 def receive = { case incBy: Int => this.count += incBy case "getCount" => sender ! this.count } } // create actor (code omitted) incActor ! 3 // ... Patterns.ask(incActor, "getCount").onSuccess { count => println(count) }Talk about how data received from the actor has to be processed asynchronously (callbacks) because actors receive, process, and send messages asynchronously.
public class HelloActor extends UntypedActor { @Override public void onReceive(Object msg) throws Exception { if (msg instanceOf String) { System.out.println((String) msg); } } } // create actor (code omitted) helloActor.tell("Hello", ActorRef.noSender());
public class IncActor extends UntypedActor { private int count = 0; @Override public void onReceive(Object msg) throws Exception { if (msg instanceOf Integer) { this.count += (Integer) msg; } else if (msg instanceOf String) { if ( ((String)msg).equals("getCount") ) { getSender().tell(this.count, getSelf()); } } } } // create actor (code omitted) incActor.tell(3, ActorRef.noSender());
// continued from previous slide Patterns.ask(incActor, "getCount", 1000).onComplete( new OnComplete<Object>() { public void onComplete(Throwable failure, Object result) { if (failure == null) { System.out.println(new Integer((int)result).toString()); } } }, system.dispatcher() // execution context );
Keep reading for an inline (uglier) version
Using futures, you can make concurrent URL requests. With this you can make a simple “race” game. Let’s assume we wanted to have a race of the search engines, we could use three URLs like
http://www.google.com/#q=scala http://www.bing.com/?q=scala https://duckduckgo.com/?q=scala
You could them time the completion of each request and print out the results once all of the futures have completed.
There is a twist to what you are going to be implementing. For each URL (whatever those URLs may be, choose wisely), you need to make more than one request. Let’s say that you actually need to make some number > 20 requests. That means that if you choose to race 3 URLs for 100 each, your application will make 300 requests. However, for each URL, the requests need to be made serially. That means if you have 3 URLs, you will be making a max of 3 URL calls in parallel.
When this is completed you should print out the results of your race including total time taken (for the race), the average response time per URL, and the winner of the race.
Requirements
Resources
Starter Template - download to get started with your project
scalaj-http - HTTP Client included in starter project
simple “timing” wrapper for timing various operations
scala worksheets - Runnable worksheets from previous workshop
By John Murray