On Github easel / akka-streams-intro
May 11, 2016
What could be easier?
def getUrlContent(url: String) = Http(url).asString.body val FirstQuestionId = 4 val LastQuestionId = 37120353 val ids = Range(FirstQuestionId, LastQuestionId) // URL's are always http://stackoverflow.com/questions/$i. val urls = ids.map(id => s"http://stackoverflow.com/questions/$id") urls.take(10).map(getUrlContent).toList.map(_.length))
scala> /**
| * Time a function call returning a tuple of the elapsed time and the result
| */
| def ptime[A](f: => A): (String, A) = {
| val t0 = System.nanoTime
| val ans = f
| val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
| (elapsed, ans)
| }
ptime: [A](f: => A)(String, A)
scala> ptime("My function result")
res1: (String, String) = (0.000 sec,My function result)
import scala.concurrent.duration._ import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import scalaj.http._
java.lang.OutOfMemoryError: Java heap space at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:68) at java.lang.StringBuilder.<init>(StringBuilder.java:112) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:95) at $anonfun$1.apply(<console>:13) at $anonfun$1.apply(<console>:13) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) ... 21 elided
Scala attempted to materialize the entire list in memory.
scala> def getUrlContent(url: String) = Http(url).asString.body getUrlContent: (url: String)String scala> val idsIter = Range(4, 37120353).iterator idsIter: Iterator[Int] = non-empty iterator scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id") urls: Iterator[String] = non-empty iterator scala> urls.next() res2: String = http://stackoverflow.com/questions/4 scala> ptime(urls.take(10).map(getUrlContent).toList.map(_.length)) res3: (String, List[Int]) = (0.641 sec,List(195, 210, 195, 166, 166, 172, 171, 177, 157, 178))
def getUrlContentThrottled(url: String) = {
Thread.sleep(100)
Http(url).asString.body
}
scala> ptime(urls.take(10).map(getUrlContentThrottled).toList.map(_.length)) res4: (String, List[Int]) = (1.256 sec,List(184, 190, 151, 157, 181, 172, 172, 172, 174, 164))
scala> def getAsync(url: String) = Future(getUrlContentThrottled(url)) getAsync: (url: String)scala.concurrent.Future[String] scala> lazy val futures = urls.take(10).toList.map(getAsync) futures: List[scala.concurrent.Future[String]] = <lazy> scala> lazy val futureSeq = Future.sequence(futures) futureSeq: scala.concurrent.Future[List[String]] = <lazy> scala> ptime(Await.result(futureSeq, 10.seconds).map(_.length)) res5: (String, List[Int]) = (0.274 sec,List(162, 157, 177, 163, 163, 168, 216, 168, 184, 170))
import scala.concurrent.duration._ import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._
| val idsIter = Range(4, 37120353).iterator
idsIter: Iterator[Int] = non-empty iterator
scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id")
urls: Iterator[String] = non-empty iterator
scala> def getUrlContent(url: String) = Http(url).asString.body
getUrlContent: (url: String)String
scala> def getUrlContentThrottled(url: String) = {
| Thread.sleep(100)
| Http(url).asString.body
| }
getUrlContentThrottled: (url: String)String
scala> def getAsync(url: String) = Future(getUrlContentThrottled(url))
getAsync: (url: String)scala.concurrent.Future[String]
scala> /**
| * Time a function call returning a tuple of the elapsed time and the result
| */
| def ptime[A](f: => A): (String, A) = {
| val t0 = System.nanoTime
| val ans = f
| val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
| (elapsed, ans)
| }
ptime: [A](f: => A)(String, A)
scala> ptime("My function result")
res2: (String, String) = (0.000 sec,My function result)
object SpiderActor {
sealed abstract class Message
case class Request(url: String) extends Message
case class Response(body: String) extends Message
def getUrlContent(url: String): Future[Response] = Future{
Response(Http(url).asString.body)
}
val ThrottleDelay = 100.milliseconds
val MaxInFlight = 10
def run(urls: Iterator[String]): Future[Seq[Int]] = {
val promise = Promise[Seq[Int]]
var lengths = Seq.empty[Int]
def persist(body: String) = Future.successful {
lengths :+= body.length
}
val system = ActorSystem()
println("Starting SpiderActor")
val actorRef = system.actorOf(Props(new SpiderActor(urls, persist)), "spider")
println("SpiderActor Started")
system.whenTerminated.map { x =>
println("SpiderActor Shut Down")
promise.complete(Try(lengths))
}
promise.future
}
}
class SpiderActor(urls: Iterator[String], persist: (String) => Future[Unit]) extends Actor {
import SpiderActor._
var inFlight = 0
private def nextRequest() = {
if(inFlight < MaxInFlight && urls.nonEmpty) {
inFlight += 1
context.system.scheduler.scheduleOnce(ThrottleDelay, self, Request(urls.next))
}
else if (inFlight == 0 && urls.isEmpty) self ! PoisonPill
}
nextRequest()
def receive: Receive = {
case Request(url) =>
nextRequest()
getUrlContent(url).pipeTo(self)
case Response(body: String) =>
inFlight -= 1
persist(body).map(_ => nextRequest())
}
@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
super.postStop()
context.system.terminate()
}
}
scala> import com.github.easel._ import com.github.easel._ scala> val u = urls.take(10) u: Iterator[String] = non-empty iterator scala> ptime(Await.result(SpiderActor.run(u), 10.seconds)) Starting SpiderActor SpiderActor Started res3: (String, Seq[Int]) = (0.950 sec,List(191, 195, 210, 195, 166, 166, 172, 171, 177, 157))
import scala.concurrent.duration._, scala.concurrent._, scala.concurrent.ExecutionContext.Implicits.global import akka.actor._, akka.stream._, akka.stream.scaladsl._, scalaj.http._, com.github.easel._ val source: Source[Int, akka.NotUsed] = Source(0 to 3) val flow: Flow[Int, String, akka.NotUsed] = Flow[Int].map(_.toString) val sink: Sink[Any, Future[akka.Done]] = Sink.foreach(println)
scala> def run = {
| implicit val actorSystem = ActorSystem()
| implicit val materializer = ActorMaterializer()
| val result = source.via(flow).runWith(sink)
| Await.result(result, 1.seconds)
| actorSystem.terminate()
| }
run: scala.concurrent.Future[akka.actor.Terminated]
scala> run
0
1
2
3
res0: scala.concurrent.Future[akka.actor.Terminated] = List()
scala> val idsIter = Range(4, 37120353).iterator
idsIter: Iterator[Int] = non-empty iterator
scala> val urls = idsIter.map(id => s"http://stackoverflow.com/questions/$id")
urls: Iterator[String] = non-empty iterator
scala> def getUrlContent(url: String) = Http(url).asString.body
getUrlContent: (url: String)String
scala> def getUrlContentThrottled(url: String) = {
| Thread.sleep(100)
| Http(url).asString.body
| }
getUrlContentThrottled: (url: String)String
scala> def getAsync(url: String) = Future(getUrlContentThrottled(url))
getAsync: (url: String)scala.concurrent.Future[String]
scala> /**
| * Time a function call returning a tuple of the elapsed time and the result
| */
| def ptime[A](f: => A): (String, A) = {
| val t0 = System.nanoTime
| val ans = f
| val elapsed = f"${((System.nanoTime-t0)*1e-9)}%.3f sec"
| (elapsed, ans)
| }
ptime: [A](f: => A)(String, A)
scala> ptime("My function result")
res2: (String, String) = (0.000 sec,My function result)
object SpiderStream {
val Concurrency = 10
val BatchSize = 100
def source(args: Iterator[String]) = Source.fromIterator(() => args)
def flow(f: (String) => Future[String]) = Flow[String]
.throttle(1, 100.millis, 1, ThrottleMode.shaping)
.mapAsync(Concurrency)(f)
.map(_.length)
.grouped(BatchSize)
def run(args: Seq[String], f: (String) => Future[String]): Future[Seq[Int]] = {
println("Starting")
implicit val system = ActorSystem()
system.whenTerminated.map { x =>
println("Shut Down")
}
implicit val mat: Materializer = ActorMaterializer()
val result = source(args.iterator)
.via(flow(f))
.runWith(Sink.head)
result.map { x =>
system.terminate()
x
}(system.dispatcher)
}
}
scala> ptime((SpiderStream.run(urls.take(10).toSeq, (x) => getAsync(x)), 10.seconds)) Starting res3: (String, (scala.concurrent.Future[Seq[Int]], scala.concurrent.duration.FiniteDuration)) = (0.039 sec,(List(),10 seconds))
val singleFlow: Flow[String] = Flow[Int]
.buffer(10, OverflowStrategy.backpressure) // buffer and apply backpressure
.buffer(10, OverflowStrategy.dropTail) // buffer and drop old
.delay(10.millis, DelayOverflowStrategy.backpressure) // force a delay downstream
.throttle(1, 10.millis, 10, ThrottleMode.Shaping) // throttle to 1 in 10 milliseconds
.map(_.toString) // map to a new type
.async // introduce an async boundary
.grouped(2) // convert every pair of ints to a Seq(1, 2)
.mapConcat(identity) // expand the groups back to ints
.mapAsyncUnordered(10)(Future.successful) // do 10 things asynchronously
.intersperse(",") // intersperse "," similar to mkString
flow
.broadcast //(1 input, n outputs) signals each output given an input signal,
.zip // (2 inputs => 1 output) create tuples of (A, B) from a stream of A and a stream of B
.unzip // (1 input => 2 outputs) unzip tuples of (A, B) into two streams one type A and one of type B
.merge // (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output,
.concat // (2 inputs, 1 output), first consume one stream, then the second stream
.interleave // mix N elements from A, then N elements from B
/**
* Akka Streams graph stage thats accepts a *SORTED* list stream of (Master, Detail) and
* groups them into Combined records. Accomplishes the equivalent of groupBy(Master) but optimized
* for a sorted stream.
*/
class MasterDetailGraphStage[Combined, Master, Detail]
(combine: (Set[(Master, Detail)]) => Combined)
extends GraphStage[FlowShape[(Master, Detail), Combined]] {
type MasterDetail = (Master, Detail)
val in = Inlet[(Master, Detail)]("in")
val out = Outlet[Combined]("out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var buffer = Set.empty[MasterDetail]
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (buffer.nonEmpty) {
if (buffer.head._1 == elem._1) {
buffer = buffer + elem
pull(in)
} else {
push(out, combine(buffer))
buffer = Set(elem)
}
} else {
buffer = Set(elem)
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.nonEmpty) emit(out, combine(buffer))
complete(out)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
/**
* Objects which can be identified by a key and ordered
*/
trait Keyed {
def key: String
}
object Keyed {
implicit def ordering[T <: Keyed]: Ordering[T] = new Ordering[T] {
override def compare(x: T, y: T): Int = {
x.key.compareTo(y.key)
}
}
}
/**
* An interface for services which provided paginated results in batches.
*/
trait BatchLoader[T <: Keyed] {
def load(offset: Option[String]): Future[Batch[T]]
def source(implicit ec: ExecutionContext): Source[T, ActorRef] = {
Source.actorPublisher(BatchSource.props[T](this))
}
}
class BatchSource[A <: Keyed](loader: BatchLoader[A])(implicit ec: ExecutionContext) extends ActorPublisher[A] {
import BatchSource._
import akka.stream.actor.ActorPublisherMessage._
private var first = true
private var nextOffset: Option[String] = None
private var buffer: Seq[A] = Seq.empty
def receive: Receive = waitingForDownstreamReq(0)
case object Pull
private def shouldLoadMore = {
nextOffset.isDefined && (totalDemand > 0 || buffer.length < BUFFER_AMOUNT)
}
def waitingForDownstreamReq(offset: Long): Receive = {
case Request(_) | Pull =>
val sent = if (buffer.nonEmpty) {
sendFromBuff(totalDemand)
} else {
0
}
if (first || (shouldLoadMore && isActive)) {
first = false
loader.load(nextOffset).pipeTo(self)
context.become(waitingForFut(offset + sent, totalDemand))
}
case Cancel => context.stop(self)
}
def sendFromBuff(demand: Long): Long = {
val consumed = buffer.take(demand.toInt).toList
buffer = buffer.drop(consumed.length)
consumed.foreach(onNext)
if (nextOffset.isEmpty && buffer.isEmpty) {
onComplete()
}
consumed.length.toLong
}
def waitingForFut(s: Long, beforeFutDemand: Long): Receive = {
case batch: Batch[A] =>
nextOffset = if (batch.items.isEmpty) {
None
} else {
batch.nextOffset
}
buffer = buffer ++ batch.items
val consumed = sendFromBuff(beforeFutDemand)
self ! Pull
context.become(waitingForDownstreamReq(s + consumed))
case Request(_) | Pull => // ignoring until we receive the future response
case Status.Failure(err) =>
context.become(waitingForDownstreamReq(s))
onError(err)
case Cancel => context.stop(self)
}
}
object BatchSource {
final val BUFFER_AMOUNT = 1000
def props[T <: Keyed](loader: BatchLoader[T])(implicit ec: ExecutionContext): Props = {
Props(new BatchSource[T](loader))
}
}
Further Resources