An Introduction to Akka Streams – Erik LaBianca



An Introduction to Akka Streams – Erik LaBianca

0 1


akka-streams-intro

A presentation introducing Akka Streams.

On Github easel / akka-streams-intro

An Introduction to Akka Streams

Erik LaBianca

May 11, 2016

@easel @easel erik.labianca@gmail.com https://www.linkedin.com/in/eriklabianca https://erik.labianca.org/akka-streams-introhttps://github.com/easel/akka-streams-intro

Overview

What's a Stream? The Streams Ecosystem Why Do I Care?
  • Web Spider
Anatomy of an Akka Stream Other Operations Other Use Cases Custom Sources Custom Graph Stages

A Stream

is A Continuous Flow

from a Source

to a Sink

An Reactive Stream

is non-blocking

and Supports back pressure

Reactive Streams Ecosystem

Async Sync Streams Sources Sinks
  • Akka Streams
  • fs2
  • Monix
  • ReactiveX
  • InputStream
  • Channels
  • Stream,Iterator
  • Unix Pipes
  • any Iterator
  • nio AsyncChannels
  • scala Futures
  • DB queries
  • collection
  • files
  • network
  • streams
Stream Sources (Services) Stream Processing (Platforms) Stream Sources (Libraries)
  • Apache Kafka
  • AWS Kinesis
  • AWS SQS, SNS
  • Spark Streaming
  • Apache Flink
  • AWS Lambda
  • Play
  • Slick

Problem: Mirror Stack OverFlow

What could be easier?

def getUrlContent(url: String) = Http(url).asString.body

val FirstQuestionId = 4
val LastQuestionId = 37120353

val ids = Range(FirstQuestionId, LastQuestionId)

// URL's are always http://stackoverflow.com/questions/$i.
val urls = ids.map(id => s"http://stackoverflow.com/questions/$id")

urls.take(10).map(getUrlContent).toList.map(_.length))

Some utility functions

scala> /**
     |  * Time a function call returning a tuple of the elapsed time and the result
     |  */
     | def ptime[A](f: => A): (String, A) = {
     |   val t0 = System.nanoTime
     |   val ans = f
     |   val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
     |   (elapsed, ans)
     | }
ptime: [A](f: => A)(String, A)

scala> ptime("My function result")
res1: (String, String) = (0.000 sec,My function result)

And the imports and implicits

import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scalaj.http._

Noooooo!

java.lang.OutOfMemoryError: Java heap space
  at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:68)
  at java.lang.StringBuilder.<init>(StringBuilder.java:112)
  at scala.StringContext.standardInterpolator(StringContext.scala:123)
  at scala.StringContext.s(StringContext.scala:95)
  at $anonfun$1.apply(<console>:13)
  at $anonfun$1.apply(<console>:13)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.Range.foreach(Range.scala:160)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  ... 21 elided

What happened?

Scala attempted to materialize the entire list in memory.

Lazyness to the Rescue

Iterate all the Things!

scala> def getUrlContent(url: String) = Http(url).asString.body
getUrlContent: (url: String)String

scala> val idsIter = Range(4, 37120353).iterator
idsIter: Iterator[Int] = non-empty iterator

scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id")
urls: Iterator[String] = non-empty iterator

scala> urls.next()
res2: String = http://stackoverflow.com/questions/4

scala> ptime(urls.take(10).map(getUrlContent).toList.map(_.length))
res3: (String, List[Int]) = (0.641 sec,List(195, 210, 195, 166, 166, 172, 171, 177, 157, 178))

What if we need to throttle?

def getUrlContentThrottled(url: String) = {
    Thread.sleep(100)
    Http(url).asString.body
}
scala> ptime(urls.take(10).map(getUrlContentThrottled).toList.map(_.length))
res4: (String, List[Int]) = (1.256 sec,List(184, 190, 151, 157, 181, 172, 172, 172, 174, 164))

What if we need parallelize?

scala> def getAsync(url: String) = Future(getUrlContentThrottled(url))
getAsync: (url: String)scala.concurrent.Future[String]

scala> lazy val futures = urls.take(10).toList.map(getAsync)
futures: List[scala.concurrent.Future[String]] = <lazy>

scala> lazy val futureSeq = Future.sequence(futures)
futureSeq: scala.concurrent.Future[List[String]] = <lazy>

scala> ptime(Await.result(futureSeq, 10.seconds).map(_.length))
res5: (String, List[Int]) = (0.274 sec,List(162, 157, 177, 163, 163, 168, 216, 168, 184, 170))
  • Are we still throttling?
  • What if we need to make 1,000,000 calls, 10 at a time??
  • What happens if we need to make 10,000 calls concurrently?
  • What happens if we need to retry?
  • What happens if the upstream server is really slow?

Eliminating Blocking

  • 10,000 concurrent calls
    • Requires 10,000 threads
    • With 1m stack, 10gb of RAM before heap.
  • Naive throttling waits the initial delay and then destroys the upstream server.
  • Other negative side effects notwithstanding.
  • Net-Net, we need to get rid of the Await.result

Actor Model

  • Eliminates Explicit Locking and Thread Management
  • Encapsulated
  • Location Agnostic
  • Communicate Exclusively via Messages
  • Thread-safe within Receive Method
  • Single receive method

Imports and implicits

import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._

Some utility functions

     | val idsIter = Range(4, 37120353).iterator
idsIter: Iterator[Int] = non-empty iterator

scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id")
urls: Iterator[String] = non-empty iterator

scala> def getUrlContent(url: String) = Http(url).asString.body
getUrlContent: (url: String)String

scala> def getUrlContentThrottled(url: String) = {
     |     Thread.sleep(100)
     |     Http(url).asString.body
     | }
getUrlContentThrottled: (url: String)String

scala> def getAsync(url: String) = Future(getUrlContentThrottled(url))
getAsync: (url: String)scala.concurrent.Future[String]

scala> /**
     |  * Time a function call returning a tuple of the elapsed time and the result
     |  */
     | def ptime[A](f: => A): (String, A) = {
     |   val t0 = System.nanoTime
     |   val ans = f
     |   val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
     |   (elapsed, ans)
     | }
ptime: [A](f: => A)(String, A)

scala> ptime("My function result")
res2: (String, String) = (0.000 sec,My function result)
object SpiderActor {
  sealed abstract class Message
  case class Request(url: String) extends Message
  case class Response(body: String) extends Message

  def getUrlContent(url: String): Future[Response] = Future{
    Response(Http(url).asString.body)
  }

  val ThrottleDelay = 100.milliseconds
  val MaxInFlight = 10

  def run(urls: Iterator[String]): Future[Seq[Int]] = {
    val promise = Promise[Seq[Int]]
    var lengths = Seq.empty[Int]

    def persist(body: String) = Future.successful {
      lengths :+= body.length
    }

    val system = ActorSystem()

    println("Starting SpiderActor")
    val actorRef = system.actorOf(Props(new SpiderActor(urls, persist)), "spider")
    println("SpiderActor Started")

    system.whenTerminated.map { x =>
      println("SpiderActor Shut Down")
      promise.complete(Try(lengths))
    }
    promise.future
  }
}
class SpiderActor(urls: Iterator[String], persist: (String) => Future[Unit]) extends Actor {
  import SpiderActor._

  var inFlight = 0

  private def nextRequest() = {
    if(inFlight < MaxInFlight && urls.nonEmpty) {
      inFlight += 1
      context.system.scheduler.scheduleOnce(ThrottleDelay, self, Request(urls.next))
    }
    else if (inFlight == 0 && urls.isEmpty) self ! PoisonPill
  }

  nextRequest()

  def receive: Receive = {
    case Request(url) =>
      nextRequest()
      getUrlContent(url).pipeTo(self)
    case Response(body: String) =>
      inFlight -= 1
      persist(body).map(_ => nextRequest())
  }

  @scala.throws[Exception](classOf[Exception])
  override def postStop(): Unit = {
    super.postStop()
    context.system.terminate()
  }
}
scala> import com.github.easel._
import com.github.easel._

scala> val u = urls.take(10)
u: Iterator[String] = non-empty iterator

scala> ptime(Await.result(SpiderActor.run(u), 10.seconds))
Starting SpiderActor
SpiderActor Started
res3: (String, Seq[Int]) = (0.950 sec,List(191, 195, 210, 195, 166, 166, 172, 171, 177, 157))

Akka Streams

Source -> Flow -> Sink

import scala.concurrent.duration._, scala.concurrent._,  scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._, com.github.easel._
val source: Source[Int, akka.NotUsed] = Source(0 to 3)
val flow: Flow[Int, String, akka.NotUsed] = Flow[Int].map(_.toString)
val sink: Sink[Any, Future[akka.Done]] = Sink.foreach(println)
scala> def run = {
     |   implicit val actorSystem = ActorSystem()
     |   implicit val materializer = ActorMaterializer()
     |   val result = source.via(flow).runWith(sink)
     |   Await.result(result, 1.seconds)
     |   actorSystem.terminate()
     | }
run: scala.concurrent.Future[akka.actor.Terminated]

scala> run
0
1
2
3
res0: scala.concurrent.Future[akka.actor.Terminated] = List()

Some utility functions

scala> val idsIter = Range(4, 37120353).iterator
idsIter: Iterator[Int] = non-empty iterator

scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id")
urls: Iterator[String] = non-empty iterator

scala> def getUrlContent(url: String) = Http(url).asString.body
getUrlContent: (url: String)String

scala> def getUrlContentThrottled(url: String) = {
     |     Thread.sleep(100)
     |     Http(url).asString.body
     | }
getUrlContentThrottled: (url: String)String

scala> def getAsync(url: String) = Future(getUrlContentThrottled(url))
getAsync: (url: String)scala.concurrent.Future[String]

scala> /**
     |  * Time a function call returning a tuple of the elapsed time and the result
     |  */
     | def ptime[A](f: => A): (String, A) = {
     |   val t0 = System.nanoTime
     |   val ans = f
     |   val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
     |   (elapsed, ans)
     | }
ptime: [A](f: => A)(String, A)

scala> ptime("My function result")
res2: (String, String) = (0.000 sec,My function result)
object SpiderStream {
  val Concurrency = 10
  val BatchSize = 100

  def source(args: Iterator[String]) = Source.fromIterator(() => args)

  def flow(f: (String) => Future[String]) = Flow[String]
    .throttle(1, 100.millis, 1, ThrottleMode.shaping)
    .mapAsync(Concurrency)(f)
    .map(_.length)
    .grouped(BatchSize)

  def run(args: Seq[String], f: (String) => Future[String]): Future[Seq[Int]] = {
    println("Starting")
    implicit val system = ActorSystem()
    system.whenTerminated.map { x =>
      println("Shut Down")
    }
    implicit val mat: Materializer = ActorMaterializer()

    val result = source(args.iterator)
      .via(flow(f))
      .runWith(Sink.head)

    result.map { x =>
      system.terminate()
      x
    }(system.dispatcher)
  }
}
scala> ptime((SpiderStream.run(urls.take(10).toSeq, (x) => getAsync(x)), 10.seconds))
Starting
res3: (String, (scala.concurrent.Future[Seq[Int]], scala.concurrent.duration.FiniteDuration)) = (0.039 sec,(List(),10 seconds))

Built-in Stages

Single Stream

val singleFlow: Flow[String] = Flow[Int]
  .buffer(10, OverflowStrategy.backpressure) // buffer and apply backpressure
  .buffer(10, OverflowStrategy.dropTail) // buffer and drop old
  .delay(10.millis, DelayOverflowStrategy.backpressure) // force a delay downstream
  .throttle(1, 10.millis, 10, ThrottleMode.Shaping) // throttle to 1 in 10 milliseconds
  .map(_.toString) // map to a new type
  .async // introduce an async boundary
  .grouped(2) // convert every pair of ints to a Seq(1, 2)
  .mapConcat(identity) // expand the groups back to ints
  .mapAsyncUnordered(10)(Future.successful) // do 10 things asynchronously
  .intersperse(",") // intersperse "," similar to mkString

Multi Stream

flow
    .broadcast //(1 input, n outputs) signals each output given an input signal,
    .zip // (2 inputs => 1 output) create tuples of (A, B) from a stream of A and a stream of B
    .unzip // (1 input => 2 outputs) unzip tuples of (A, B) into two streams one type A and one of type B
    .merge // (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output,
    .concat //  (2 inputs, 1 output), first consume one stream, then the second stream
    .interleave // mix N elements from A, then N elements from B

Other Use Cases

  • Stream results from Database via Slick
  • Stream HTTP requests (akka-http / play)
  • Stream HTTP responses (akka-http / play)
  • Stream query results from ElasticSearch
  • Distribute heavy computation using Akka Cluster
  • Replace "work pulling" pattern

Custom Flow Graph

/**
  * Akka Streams graph stage thats accepts a *SORTED* list stream of (Master, Detail) and
  * groups them into Combined records. Accomplishes the equivalent of groupBy(Master) but optimized
  * for a sorted stream.
  */
class MasterDetailGraphStage[Combined, Master, Detail]
(combine: (Set[(Master, Detail)]) => Combined)
  extends GraphStage[FlowShape[(Master, Detail), Combined]] {
  type MasterDetail = (Master, Detail)
  val in = Inlet[(Master, Detail)]("in")
  val out = Outlet[Combined]("out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var buffer = Set.empty[MasterDetail]
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (buffer.nonEmpty) {
            if (buffer.head._1 == elem._1) {
              buffer = buffer + elem
              pull(in)
            } else {
              push(out, combine(buffer))
              buffer = Set(elem)
            }
          } else {
            buffer = Set(elem)
            pull(in)
          }
        }

        override def onUpstreamFinish(): Unit = {
          if (buffer.nonEmpty) emit(out, combine(buffer))
          complete(out)
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

Custom Source

/**
* Objects which can be identified by a key and ordered
*/
trait Keyed {
  def key: String
}

object Keyed {
  implicit def ordering[T <: Keyed]: Ordering[T] = new Ordering[T] {
    override def compare(x: T, y: T): Int = {
      x.key.compareTo(y.key)
    }
  }
}

/**
* An interface for services which provided paginated results in batches.
*/
trait BatchLoader[T <: Keyed] {
  def load(offset: Option[String]): Future[Batch[T]]

  def source(implicit ec: ExecutionContext): Source[T, ActorRef] = {
    Source.actorPublisher(BatchSource.props[T](this))
  }
}
class BatchSource[A <: Keyed](loader: BatchLoader[A])(implicit ec: ExecutionContext) extends ActorPublisher[A] {
  import BatchSource._

  import akka.stream.actor.ActorPublisherMessage._

  private var first = true
  private var nextOffset: Option[String] = None
  private var buffer: Seq[A] = Seq.empty

  def receive: Receive = waitingForDownstreamReq(0)

  case object Pull

  private def shouldLoadMore = {
    nextOffset.isDefined && (totalDemand > 0 || buffer.length < BUFFER_AMOUNT)
  }

  def waitingForDownstreamReq(offset: Long): Receive = {
    case Request(_) | Pull =>
      val sent = if (buffer.nonEmpty) {
        sendFromBuff(totalDemand)
      } else {
        0
      }
      if (first || (shouldLoadMore && isActive)) {
        first = false
        loader.load(nextOffset).pipeTo(self)
        context.become(waitingForFut(offset + sent, totalDemand))
      }

    case Cancel => context.stop(self)
  }

  def sendFromBuff(demand: Long): Long = {
    val consumed = buffer.take(demand.toInt).toList
    buffer = buffer.drop(consumed.length)
    consumed.foreach(onNext)
    if (nextOffset.isEmpty && buffer.isEmpty) {
      onComplete()
    }
    consumed.length.toLong
  }

  def waitingForFut(s: Long, beforeFutDemand: Long): Receive = {
    case batch: Batch[A] =>
      nextOffset = if (batch.items.isEmpty) {
        None
      } else {
        batch.nextOffset
      }
      buffer = buffer ++ batch.items
      val consumed = sendFromBuff(beforeFutDemand)
      self ! Pull
      context.become(waitingForDownstreamReq(s + consumed))

    case Request(_) | Pull => // ignoring until we receive the future response

    case Status.Failure(err) =>
      context.become(waitingForDownstreamReq(s))
      onError(err)

    case Cancel => context.stop(self)
  }
}

object BatchSource {
  final val BUFFER_AMOUNT = 1000
  def props[T <: Keyed](loader: BatchLoader[T])(implicit ec: ExecutionContext): Props = {
    Props(new BatchSource[T](loader))
  }
}

Thanks!

Questions?

Further Resources

1/28
An Introduction to Akka Streams Erik LaBianca May 11, 2016 @easel @easel erik.labianca@gmail.com https://www.linkedin.com/in/eriklabianca https://erik.labianca.org/akka-streams-intro https://github.com/easel/akka-streams-intro