Don't Block – How to Mess Up Akka and Spray



Don't Block – How to Mess Up Akka and Spray

0 2


iascala-dont-block

Talk given at iascala Feb 2014

On Github zcox / iascala-dont-block

Don't Block

How to Mess Up Akka and Spray

Zach Cox - @zcox - zach.cox@banno.com

iascala - Feb 2014

Don ts block

— Horse ebooks (@Horse_ebooks) August 4, 2013

Purpose

  • Scala provides great concurrency tools
  • Very easy to mess up
  • Demonstrate how blocking can prevent processing
  • Provide solutions

Things We Will Block

  • Threads (Java)
  • Futures (Scala)
  • Actors (Akka)
  • Spray

Spoiler

All about blocking threads

What is Blocking?

  • Code that takes a long time to run
  • Network I/O
    • HTTP requests
    • Database access
  • File I/O
  • Really heavy computation
  • Nothing after the blocking function can run on this thread until it is done

Java: Runnable, Executor

trait Runnable {
  def run(): Unit
}

trait Executor {
  def execute(command: Runnable): Unit
}

ThreadPoolExecutor

class ThreadPoolExecutor extends Executor {
  val pool: Set[Thread] = Set.empty
  val tasks: BlockingQueue[Runnable] = ???

  def execute(command: Runnable): Unit = {
    //run command on new thread, or queue it, or reject it, depending on settings...
  }
}

How to block ThreadPoolExecutor

  • Execute tasks that block in run() method
  • All threads in pool get blocked
  • Tasks are queued before execution (until queue is full)
  • How it is supposed to work...
  • Need to be aware of the blocking though

ForkJoinPool

How to block ForkJoinPool

Solutions for Blocked Executor

val executor = Executors.newFixedThreadPool(moreThreads)
val executor1 = Executors.newFixedThreadPool(pool1Size)
val executor2 = Executors.newFixedThreadPool(pool2Size)

Java: Callable, Future, ExecutorService

trait Callable[V] {
  def call(): V
}

trait Future[V] {
  def isDone(): Boolean
  def get(): V
}

trait ExecutorService extends Executor {
  def submit(task: Runnable): Future[_]
  def submit[T](task: Callable[T]): Future[T]
}

Future[T]

  • Monad that eventually contains either:
    • A value of type T (success)
    • A Throwable (failure)
  • Future[T] is the read-side; Promise[T] is the write-side
  • Value is computed and placed into promise/future on some other thread (usually)
//TODO compelling example of Future...

ExecutionContext

  • Runs code that asynchronously completes futures
  • Scala version of Executor/ExecutorService
    • Implementations usually wrap one
    • Executor => ExecutionContext
    • ExecutorService => ExecutionContext
  • Future.apply runs body function using ExecutionContext
    • Wraps the body function in a Runnable
    • Executes that Runnable on an ExecutionContext
    • That Runnable completes a Promise
  • ExecutionContext.global
    • Tries to use ForkJoinPool
    • Falls back to ThreadPoolExecutor

How to block Future/ExecutionContext

  • ExecutionContext just wraps an Executor/ExecutorService
  • ExecutionContext.global usually wraps either a ForkJoinPool or a ThreadPoolExecutor
  • We already know how to block those
  • tldr function passed to Future.apply blocks the underlying thread, exhaust the pool

Solutions for blocked Future/ExecutionContext

java ... \
     -Dscala.concurrent.context.minThreads=8 \
     -Dscala.concurrent.context.numThreads=16 \
     -Dscala.concurrent.context.maxThreads=24 \
     ...
implicit val c = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(123))
implicit val defaultContext = ExecutionContext.global
val databaseContext = ExecutionContext.fromExecutor(null)

Future("default processing")
Future("database operations")(databaseContext)

Actors and Dispatchers

  • actor ! msg
  • msg placed in actor''s Mailbox queue
  • Mailbox is a Runnable
  • Mailbox executed on dispatcher''s ExecutorService
  • By default, all actors use the same default dispatcher

How to Block Actors

  • Remember how to block Executor? Do that.
  • Block in Actor.receive
  • Enough blocked actors will exhaust the dispatcher''s thread pool

Solutions for Blocked Actors

blocking2-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
}
val blocking = system.actorOf(Props[BlockingActor]
   .withRouter(FromConfig())
   .withDispatcher("blocking2-dispatcher"), "blocking2")
my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-factor = 10.0
    parallelism-max = 100
  }
}

Fun Fact

MessageDispatcher is an ExecutionContext

val jdbcContext = system.dispatchers.lookup("jdbc-dispatcher")
Future(useTheDatabase)(jdbcContext)

Spray

  • spray-io/akka-io: Java NIO + Actors
  • spray-can: HTTP server & client built on spray-io
  • spray-routing: HTTP request/response DSL

How to Block Spray

  • Built on actors
  • We know how to block those
  • SimpleRoutingApp uses a single actor to route all requests (!!!)
  • That actor synchronously calls runRoute - easily blocked!
  • That actor uses default Akka dispatcher - easily blocked!
  • Example code

Solutions for Blocking Spray

  • Do not call blocking functions directly in routes
    • Instead detach to Future or Actor
    • Spray can complete a response using a Future
  • Use separate dispatchers
    • Give Spray its own dispatcher(s)
    • Give your blocking code its own dispatcher(s)

Java NIO and spray-client

  • Blocking I/O: One thread per socket
  • Non-blocking I/O: One thread, many sockets
  • No network I/O (web service client, database, etc) libraries use it!
  • Except spray-client...
  • Start writing Scala web service clients using spray-can!
  • Other protocols (TCP, SMTP, XMPP, various DBs, etc) can use akka-io

Banno is Hiring Scala Developers!

zach.cox@banno.com