On Github easel / akka-streams-intro
May 11, 2016
What could be easier?
def getUrlContent(url: String) = Http(url).asString.body val FirstQuestionId = 4 val LastQuestionId = 37120353 val ids = Range(FirstQuestionId, LastQuestionId) // URL's are always http://stackoverflow.com/questions/$i. val urls = ids.map(id => s"http://stackoverflow.com/questions/$id") urls.take(10).map(getUrlContent).toList.map(_.length))
scala> /** | * Time a function call returning a tuple of the elapsed time and the result | */ | def ptime[A](f: => A): (String, A) = { | val t0 = System.nanoTime | val ans = f | val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec" | (elapsed, ans) | } ptime: [A](f: => A)(String, A) scala> ptime("My function result") res1: (String, String) = (0.000 sec,My function result)
import scala.concurrent.duration._ import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import scalaj.http._
java.lang.OutOfMemoryError: Java heap space at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:68) at java.lang.StringBuilder.<init>(StringBuilder.java:112) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:95) at $anonfun$1.apply(<console>:13) at $anonfun$1.apply(<console>:13) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) ... 21 elided
Scala attempted to materialize the entire list in memory.
scala> def getUrlContent(url: String) = Http(url).asString.body getUrlContent: (url: String)String scala> val idsIter = Range(4, 37120353).iterator idsIter: Iterator[Int] = non-empty iterator scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id") urls: Iterator[String] = non-empty iterator scala> urls.next() res2: String = http://stackoverflow.com/questions/4 scala> ptime(urls.take(10).map(getUrlContent).toList.map(_.length)) res3: (String, List[Int]) = (0.641 sec,List(195, 210, 195, 166, 166, 172, 171, 177, 157, 178))
def getUrlContentThrottled(url: String) = { Thread.sleep(100) Http(url).asString.body }
scala> ptime(urls.take(10).map(getUrlContentThrottled).toList.map(_.length)) res4: (String, List[Int]) = (1.256 sec,List(184, 190, 151, 157, 181, 172, 172, 172, 174, 164))
scala> def getAsync(url: String) = Future(getUrlContentThrottled(url)) getAsync: (url: String)scala.concurrent.Future[String] scala> lazy val futures = urls.take(10).toList.map(getAsync) futures: List[scala.concurrent.Future[String]] = <lazy> scala> lazy val futureSeq = Future.sequence(futures) futureSeq: scala.concurrent.Future[List[String]] = <lazy> scala> ptime(Await.result(futureSeq, 10.seconds).map(_.length)) res5: (String, List[Int]) = (0.274 sec,List(162, 157, 177, 163, 163, 168, 216, 168, 184, 170))
import scala.concurrent.duration._ import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._
| val idsIter = Range(4, 37120353).iterator idsIter: Iterator[Int] = non-empty iterator scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id") urls: Iterator[String] = non-empty iterator scala> def getUrlContent(url: String) = Http(url).asString.body getUrlContent: (url: String)String scala> def getUrlContentThrottled(url: String) = { | Thread.sleep(100) | Http(url).asString.body | } getUrlContentThrottled: (url: String)String scala> def getAsync(url: String) = Future(getUrlContentThrottled(url)) getAsync: (url: String)scala.concurrent.Future[String] scala> /** | * Time a function call returning a tuple of the elapsed time and the result | */ | def ptime[A](f: => A): (String, A) = { | val t0 = System.nanoTime | val ans = f | val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec" | (elapsed, ans) | } ptime: [A](f: => A)(String, A) scala> ptime("My function result") res2: (String, String) = (0.000 sec,My function result)
object SpiderActor { sealed abstract class Message case class Request(url: String) extends Message case class Response(body: String) extends Message def getUrlContent(url: String): Future[Response] = Future{ Response(Http(url).asString.body) } val ThrottleDelay = 100.milliseconds val MaxInFlight = 10 def run(urls: Iterator[String]): Future[Seq[Int]] = { val promise = Promise[Seq[Int]] var lengths = Seq.empty[Int] def persist(body: String) = Future.successful { lengths :+= body.length } val system = ActorSystem() println("Starting SpiderActor") val actorRef = system.actorOf(Props(new SpiderActor(urls, persist)), "spider") println("SpiderActor Started") system.whenTerminated.map { x => println("SpiderActor Shut Down") promise.complete(Try(lengths)) } promise.future } }
class SpiderActor(urls: Iterator[String], persist: (String) => Future[Unit]) extends Actor { import SpiderActor._ var inFlight = 0 private def nextRequest() = { if(inFlight < MaxInFlight && urls.nonEmpty) { inFlight += 1 context.system.scheduler.scheduleOnce(ThrottleDelay, self, Request(urls.next)) } else if (inFlight == 0 && urls.isEmpty) self ! PoisonPill } nextRequest() def receive: Receive = { case Request(url) => nextRequest() getUrlContent(url).pipeTo(self) case Response(body: String) => inFlight -= 1 persist(body).map(_ => nextRequest()) } @scala.throws[Exception](classOf[Exception]) override def postStop(): Unit = { super.postStop() context.system.terminate() } }
scala> import com.github.easel._ import com.github.easel._ scala> val u = urls.take(10) u: Iterator[String] = non-empty iterator scala> ptime(Await.result(SpiderActor.run(u), 10.seconds)) Starting SpiderActor SpiderActor Started res3: (String, Seq[Int]) = (0.950 sec,List(191, 195, 210, 195, 166, 166, 172, 171, 177, 157))
import scala.concurrent.duration._, scala.concurrent._, scala.concurrent.ExecutionContext.Implicits.global import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._, com.github.easel._ val source: Source[Int, akka.NotUsed] = Source(0 to 3) val flow: Flow[Int, String, akka.NotUsed] = Flow[Int].map(_.toString) val sink: Sink[Any, Future[akka.Done]] = Sink.foreach(println)
scala> def run = { | implicit val actorSystem = ActorSystem() | implicit val materializer = ActorMaterializer() | val result = source.via(flow).runWith(sink) | Await.result(result, 1.seconds) | actorSystem.terminate() | } run: scala.concurrent.Future[akka.actor.Terminated] scala> run 0 1 2 3 res0: scala.concurrent.Future[akka.actor.Terminated] = List()
scala> val idsIter = Range(4, 37120353).iterator idsIter: Iterator[Int] = non-empty iterator scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id") urls: Iterator[String] = non-empty iterator scala> def getUrlContent(url: String) = Http(url).asString.body getUrlContent: (url: String)String scala> def getUrlContentThrottled(url: String) = { | Thread.sleep(100) | Http(url).asString.body | } getUrlContentThrottled: (url: String)String scala> def getAsync(url: String) = Future(getUrlContentThrottled(url)) getAsync: (url: String)scala.concurrent.Future[String] scala> /** | * Time a function call returning a tuple of the elapsed time and the result | */ | def ptime[A](f: => A): (String, A) = { | val t0 = System.nanoTime | val ans = f | val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec" | (elapsed, ans) | } ptime: [A](f: => A)(String, A) scala> ptime("My function result") res2: (String, String) = (0.000 sec,My function result)
object SpiderStream { val Concurrency = 10 val BatchSize = 100 def source(args: Iterator[String]) = Source.fromIterator(() => args) def flow(f: (String) => Future[String]) = Flow[String] .throttle(1, 100.millis, 1, ThrottleMode.shaping) .mapAsync(Concurrency)(f) .map(_.length) .grouped(BatchSize) def run(args: Seq[String], f: (String) => Future[String]): Future[Seq[Int]] = { println("Starting") implicit val system = ActorSystem() system.whenTerminated.map { x => println("Shut Down") } implicit val mat: Materializer = ActorMaterializer() val result = source(args.iterator) .via(flow(f)) .runWith(Sink.head) result.map { x => system.terminate() x }(system.dispatcher) } }
scala> ptime((SpiderStream.run(urls.take(10).toSeq, (x) => getAsync(x)), 10.seconds)) Starting res3: (String, (scala.concurrent.Future[Seq[Int]], scala.concurrent.duration.FiniteDuration)) = (0.039 sec,(List(),10 seconds))
val singleFlow: Flow[String] = Flow[Int] .buffer(10, OverflowStrategy.backpressure) // buffer and apply backpressure .buffer(10, OverflowStrategy.dropTail) // buffer and drop old .delay(10.millis, DelayOverflowStrategy.backpressure) // force a delay downstream .throttle(1, 10.millis, 10, ThrottleMode.Shaping) // throttle to 1 in 10 milliseconds .map(_.toString) // map to a new type .async // introduce an async boundary .grouped(2) // convert every pair of ints to a Seq(1, 2) .mapConcat(identity) // expand the groups back to ints .mapAsyncUnordered(10)(Future.successful) // do 10 things asynchronously .intersperse(",") // intersperse "," similar to mkString
flow .broadcast //(1 input, n outputs) signals each output given an input signal, .zip // (2 inputs => 1 output) create tuples of (A, B) from a stream of A and a stream of B .unzip // (1 input => 2 outputs) unzip tuples of (A, B) into two streams one type A and one of type B .merge // (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, .concat // (2 inputs, 1 output), first consume one stream, then the second stream .interleave // mix N elements from A, then N elements from B
/** * Akka Streams graph stage thats accepts a *SORTED* list stream of (Master, Detail) and * groups them into Combined records. Accomplishes the equivalent of groupBy(Master) but optimized * for a sorted stream. */ class MasterDetailGraphStage[Combined, Master, Detail] (combine: (Set[(Master, Detail)]) => Combined) extends GraphStage[FlowShape[(Master, Detail), Combined]] { type MasterDetail = (Master, Detail) val in = Inlet[(Master, Detail)]("in") val out = Outlet[Combined]("out") val shape = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var buffer = Set.empty[MasterDetail] setHandler(in, new InHandler { override def onPush(): Unit = { val elem = grab(in) if (buffer.nonEmpty) { if (buffer.head._1 == elem._1) { buffer = buffer + elem pull(in) } else { push(out, combine(buffer)) buffer = Set(elem) } } else { buffer = Set(elem) pull(in) } } override def onUpstreamFinish(): Unit = { if (buffer.nonEmpty) emit(out, combine(buffer)) complete(out) } }) setHandler(out, new OutHandler { override def onPull(): Unit = { pull(in) } }) } }
/** * Objects which can be identified by a key and ordered */ trait Keyed { def key: String } object Keyed { implicit def ordering[T <: Keyed]: Ordering[T] = new Ordering[T] { override def compare(x: T, y: T): Int = { x.key.compareTo(y.key) } } } /** * An interface for services which provided paginated results in batches. */ trait BatchLoader[T <: Keyed] { def load(offset: Option[String]): Future[Batch[T]] def source(implicit ec: ExecutionContext): Source[T, ActorRef] = { Source.actorPublisher(BatchSource.props[T](this)) } }
class BatchSource[A <: Keyed](loader: BatchLoader[A])(implicit ec: ExecutionContext) extends ActorPublisher[A] { import BatchSource._ import akka.stream.actor.ActorPublisherMessage._ private var first = true private var nextOffset: Option[String] = None private var buffer: Seq[A] = Seq.empty def receive: Receive = waitingForDownstreamReq(0) case object Pull private def shouldLoadMore = { nextOffset.isDefined && (totalDemand > 0 || buffer.length < BUFFER_AMOUNT) } def waitingForDownstreamReq(offset: Long): Receive = { case Request(_) | Pull => val sent = if (buffer.nonEmpty) { sendFromBuff(totalDemand) } else { 0 } if (first || (shouldLoadMore && isActive)) { first = false loader.load(nextOffset).pipeTo(self) context.become(waitingForFut(offset + sent, totalDemand)) } case Cancel => context.stop(self) } def sendFromBuff(demand: Long): Long = { val consumed = buffer.take(demand.toInt).toList buffer = buffer.drop(consumed.length) consumed.foreach(onNext) if (nextOffset.isEmpty && buffer.isEmpty) { onComplete() } consumed.length.toLong } def waitingForFut(s: Long, beforeFutDemand: Long): Receive = { case batch: Batch[A] => nextOffset = if (batch.items.isEmpty) { None } else { batch.nextOffset } buffer = buffer ++ batch.items val consumed = sendFromBuff(beforeFutDemand) self ! Pull context.become(waitingForDownstreamReq(s + consumed)) case Request(_) | Pull => // ignoring until we receive the future response case Status.Failure(err) => context.become(waitingForDownstreamReq(s)) onError(err) case Cancel => context.stop(self) } } object BatchSource { final val BUFFER_AMOUNT = 1000 def props[T <: Keyed](loader: BatchLoader[T])(implicit ec: ExecutionContext): Props = { Props(new BatchSource[T](loader)) } }
Further Resources