How it all began
“Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.”
- Created by engineers from Typesafe, Netflix, Redhat, and more) plus Doug Lea :)
-
Why
- Efficiently processing large indeterminate streams is hard
- Avoiding blocking is essential to maximise performance
- Every stage in the stream needs to be able to push and pull
- We don't want to overload (or starve!) downstream consumers...
- we want to pass data across applications, nodes, CPUs, threads, actors, etc
-
How
- Treat data as a stream of elements
- Asynchronous non-blocking data and demand flows
- Demand flows upstream, causing data to flow downstream
- Data flow is therefore restricted by demand
- Demand happens on a separate flow!
- Demand can be in blocks (give me the next 50!!)
-
What
- The Reactive Streams specification is just that
- A collection of interfaces methods and protocols
- Provides example implementations and a TCK for verification
- Aimed at providing a way to build common implementations
- It's just a specification!!
-
Introducing Akka Streams!!
Akka's implementation of Reactive Streams
Design Principles
- Explicitness over magic (I'm looking at you Shapeless!)
- Fully composable
- Each component, or set of componenents can be combined
- Each building block is immutable
- Fully compatible with other Reactive Stream implementations
Building blocks cont...
- Source
- Traditionally known as a producer
- Supplies messages that will flow downstream
- Exactly one output stream
- Sink
- Traditionally known as a consumer
- End point of the stream, this is where messages end up
Building blocks cont...
- Flow
- A processing stage in the Stream
- Used to compose Streams
- Exactly one input and one output stream
- See also BidirectionalFlow (two in -> two out)
- Between the Source and Sink
- BidirectionalFlow is one of many - more later
Building blocks cont...
- RunnableGraphs
- A pre-assembled set of Stream components, packaged into a Graph.
- All exposed ports are connected (between a Source and Sink)
- This can then be Materialized
Building blocks cont...
- Composite Flows
- It is possible to wrap several components into more complex ones
- This composition can then be treated as one block
- Partial Flow Graphs
- An incomplete Flow (Graph)
- Can be used to construct more complex Graphs easily
Building blocks cont...
- Materializer
- Once complete, the flow is Materialized in order to start stream processing
- Supports fully distributed stream processing
- Each step must be either serializable immutable values or ActorRefs
- Fails immediately at runtime if the Graph isn't complete
Errors vs Failures
- Errors handlied within the stream as normal data elements
- Passed using the onNext function
- Failure means that the stream itself has failed and is collapsing
- Raises the onError signal... (???)
- Each block in the flow can choose to absorb or propagate the errors
- Possibly resulting the the complete collapse of the flow
- Supervisor strategies to manage how the stream handles failures
-
First things first
We need to create an ActorSystem and Materializer
implicit val system = ActorSystem("actors")
implicit val materializer = ActorMaterializer()
Simple Stream
We need to create an ActorSystem and Materializer
Source(1 to 5)
.filter(_ < 3) // 1, 2
.map(_ * 2) // 2, 4
.to(Sink.foreach(println))
.run()
//prints 2 4
- The fixed source produces Int's from 1 to 5
- Filters, maps then passes them to a Sink which prints the results.
Composing elements together
We can combine multiple components together
Composing elements together
val nestedSource = Source(1 to 5)
.map(_ * 2)
val nestedFlow = Flow[Int]
.filter(_ <= 4)
.map(_ + 2)
val sink = Sink.foreach(println)
//link up the Flow to a Sink
val nestedSink = nestedFlow.to(Sink.foreach(println))
// Create a RunnableGraph - and run it! Prints 4 6
nestedSource.to(nestedSink).run()
- Really useful when building complex streams
-
Composing elements together cont...
Alternatively we could do this, linking them in one step
nestedSource
.via(nestedFlow)
.to(Sink.foreach(println(_)))
- via used to link up flows (processing stages)
-
Composing elements together cont...
Graph Processing Stages
- Fan Out
-
Broadcast[T] – (1 input, N outputs)
-
Balance[T] – (1 input, N outputs)
- ...
- Fan In
-
Merge[In] – (N inputs , 1 output)
- ...
- Timer Driven
-
groupedWithin(Int, Duration)
-
Groups elements when either the number or duration is reached (whichever is first). Very useful for batching messages.
- See the Akka Stream docs for more!
Graph Processing Stages cont...
The Graph DSL
- Whenever you want to perform multiple operations to control the Flow of a Graph, manually constructing them as above can become very clumbersome and tedius, not to mentioned hard to maintain.
- For this reason the Akka team have written a DSL to help write complex Graphs.
- DSL is Scala only
- Makes visualising graphs in the code much easier!!
The Graph DSL
val g = FlowGraph.closed() {
implicit builder: FlowGraph.Builder[Unit] =>
//This provides the DSL
import FlowGraph.Implicits._
val in = Source(1 to 3)
val out = Sink.foreach(println)
//2 outputs, 2 inputs
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
}
g.run() //Prints 31 31 32 32 33 33
- closed() ensures we end up with a complete graph (can also use partial instead...
- Makes visualising graphs in the code much easier!!
Example - Reactive Kafka
- The guys at SoftwareMill have implemented a wrapper for Apache Kafka
- Tried and tested by yours truly
Example - Reactive Kafka cont...
- Source is a Kafka Consumer
- Sink is a Kafka Publisher
val kafka = new ReactiveKafka()
val publisher: Publisher[StringKafkaMessage] =
kafka.consume(
ConsumerProperties(...)
)
val subscriber: Subscriber[String] =
kafka.publish(
ProducerProperties(...)
)
Source(publisher).map(_.message().toUpperCase)
.to(Sink(subscriber)).run()
A real world example cont...
FlowGraph.closed() {
implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val in = Source(kafkaConsumer)
val out = Sink.foreach(println)
val bcast = builder
.add(Broadcast[StringKafkaMessage](2))
val merge = builder
.add(Merge[StringKafkaMessage](2))
val parser1, parser2 = Flow[StringKafkaMessage]
.map(...)
val group = Flow[StringKafkaMessage].grouped(4)
in ~> bcast ~> parser1 ~> merge ~> group ~> out
bcast ~> parser2 ~> merge
}.run()
Akka Streams
From Zero to Kafka
Created by Mark Harrison / @markglh