Don't Block
How to Mess Up Akka and Spray
Zach Cox - @zcox - zach.cox@banno.com
iascala - Feb 2014
Purpose
- Scala provides great concurrency tools
- Very easy to mess up
- Demonstrate how blocking can prevent processing
- Provide solutions
Things We Will Block
- Threads (Java)
- Futures (Scala)
- Actors (Akka)
- Spray
Spoiler
All about blocking threads
What is Blocking?
- Code that takes a long time to run
- Network I/O
- HTTP requests
- Database access
- File I/O
- Really heavy computation
- Nothing after the blocking function can run on this thread until it is done
Java: Runnable, Executor
trait Runnable {
def run(): Unit
}
trait Executor {
def execute(command: Runnable): Unit
}
ThreadPoolExecutor
class ThreadPoolExecutor extends Executor {
val pool: Set[Thread] = Set.empty
val tasks: BlockingQueue[Runnable] = ???
def execute(command: Runnable): Unit = {
//run command on new thread, or queue it, or reject it, depending on settings...
}
}
How to block ThreadPoolExecutor
- Execute tasks that block in run() method
- All threads in pool get blocked
- Tasks are queued before execution (until queue is full)
- How it is supposed to work...
- Need to be aware of the blocking though
How to block ForkJoinPool
Solutions for Blocked Executor
val executor = Executors.newFixedThreadPool(moreThreads)
val executor1 = Executors.newFixedThreadPool(pool1Size)
val executor2 = Executors.newFixedThreadPool(pool2Size)
Java: Callable, Future, ExecutorService
trait Callable[V] {
def call(): V
}
trait Future[V] {
def isDone(): Boolean
def get(): V
}
trait ExecutorService extends Executor {
def submit(task: Runnable): Future[_]
def submit[T](task: Callable[T]): Future[T]
}
Future[T]
- Monad that eventually contains either:
- A value of type T (success)
- A Throwable (failure)
-
Future[T] is the read-side; Promise[T] is the write-side
- Value is computed and placed into promise/future on some other thread (usually)
//TODO compelling example of Future...
ExecutionContext
- Runs code that asynchronously completes futures
- Scala version of Executor/ExecutorService
- Implementations usually wrap one
-
Executor => ExecutionContext
-
ExecutorService => ExecutionContext
-
Future.apply runs body function using ExecutionContext
- Wraps the body function in a Runnable
- Executes that Runnable on an ExecutionContext
- That Runnable completes a Promise
-
ExecutionContext.global
- Tries to use ForkJoinPool
- Falls back to ThreadPoolExecutor
How to block Future/ExecutionContext
-
ExecutionContext just wraps an Executor/ExecutorService
-
ExecutionContext.global usually wraps either a ForkJoinPool or a ThreadPoolExecutor
- We already know how to block those
- tldr function passed to Future.apply blocks the underlying thread, exhaust the pool
Solutions for blocked Future/ExecutionContext
java ... \
-Dscala.concurrent.context.minThreads=8 \
-Dscala.concurrent.context.numThreads=16 \
-Dscala.concurrent.context.maxThreads=24 \
...
implicit val c = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(123))
implicit val defaultContext = ExecutionContext.global
val databaseContext = ExecutionContext.fromExecutor(null)
Future("default processing")
Future("database operations")(databaseContext)
Actors and Dispatchers
-
actor ! msg
-
msg placed in actor''s Mailbox queue
-
Mailbox is a Runnable
-
Mailbox executed on dispatcher''s ExecutorService
- By default, all actors use the same default dispatcher
How to Block Actors
- Remember how to block Executor? Do that.
- Block in Actor.receive
- Enough blocked actors will exhaust the dispatcher''s thread pool
Solutions for Blocked Actors
blocking2-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
}
val blocking = system.actorOf(Props[BlockingActor]
.withRouter(FromConfig())
.withDispatcher("blocking2-dispatcher"), "blocking2")
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-factor = 10.0
parallelism-max = 100
}
}
Fun Fact
MessageDispatcher is an ExecutionContext
val jdbcContext = system.dispatchers.lookup("jdbc-dispatcher")
Future(useTheDatabase)(jdbcContext)
Spray
- spray-io/akka-io: Java NIO + Actors
- spray-can: HTTP server & client built on spray-io
- spray-routing: HTTP request/response DSL
How to Block Spray
- Built on actors
- We know how to block those
-
SimpleRoutingApp uses a single actor to route all requests (!!!)
- That actor synchronously calls runRoute - easily blocked!
- That actor uses default Akka dispatcher - easily blocked!
- Example code
Solutions for Blocking Spray
- Do not call blocking functions directly in routes
- Instead detach to Future or Actor
- Spray can complete a response using a Future
- Use separate dispatchers
- Give Spray its own dispatcher(s)
- Give your blocking code its own dispatcher(s)
Java NIO and spray-client
- Blocking I/O: One thread per socket
- Non-blocking I/O: One thread, many sockets
- No network I/O (web service client, database, etc) libraries use it!
- Except spray-client...
- Start writing Scala web service clients using spray-can!
- Other protocols (TCP, SMTP, XMPP, various DBs, etc) can use akka-io
Banno is Hiring Scala Developers!
zach.cox@banno.com