- Key to all these fancy architectures
- becoming the defacto data processing engine
Apache Spark
Apache Spark™ is a fast and general purpose engine for large-scale data processing, with built-in modules for streaming, SQL, machine learning and graph processing
- Originally developed by Berkeley’s AMP Lab in 2009
- Open sourced as part of Berkeley Data Analytics Stack (BDAS) in 2010
- Top level apache project since 2014
- Spark provides rich APIs in Java, Scala, Python, and R
- The fundamental programming abstraction in Spark is the Resilient Distributed Dataset (RDD)
- can be have crazy performance improvements over vanilla MapReduce
- Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3
- It is NOT a database
Why Spark
- Contributers from over 200 companies
- One of the most active open source projects
- IBM will invest roughly $300 million over the next few years and assign 3500 people to help develop Spark
Why Spark (cont...)
- Easy to develop
- Flexible, composable programming model
- Provides Spark Shell
- APIs for Scala, Java and Python
- Fast and Scalable!
- Optimised storage between memory and disk
- Scales from a single laptop to a large cluster
- Up to 10x-100x faster than Hadoop
- Feature Rich
- Supports Event Streaming Applications
- Efficient support for Machine Learning
- Modular architecture
- Lazily evaluates this so that...
- Spark can work out the most efficient way to do things
Spark usage scenarios
- On demand querying of large datasets
- Batch processing
- Machine Learning
- Stream Processing
- Data Transformations
- Graph processing
- Modular architecture makes it easy to extend functionality
- Spark needs data from somewhere, it's not a database
- So going back to previous slides, spark can run as the batch layer, running analytics which can them be output to tables (lambda)
- Or we can run a Spark Streaming job which can perform smaller calculations on the fly (Kappa) - on the fly
- Could actually re-use a lot of the code between layers
Interactive Shells
-
spark-shell
- Extended Scala REPL with Spark imports already in scope
-
spark-submit
- Used to submit jobs (JARs) to the cluster
-
spark-sql
-
pyspark
- Shells make programming so much easier
Spark Stack
- Architecture allows new modules to be added
- Spark Core is the underlying general execution engine for the Spark platform that all other functionality is built on top of. It provides in-memory computing capabilities to deliver speed, a generalized execution model to support a wide variety of applications, and Java, Scala, and Python APIs for ease of development.
Spark Jobs
Directed Acyclic Graph
- The driver breaks the RDD graph down into jobs
- Each job is composed of stages
- Stages get implemented by one or more tasks
- Tasks are sent to the executors to perform the work
Spark Cluster Overview
Driver connects to manager. Asks it for resources
Manager assigns nodes, allocates resources
Executor started on each assigned node, isolated from other applications
Tasks sent to executors and started on nodes - multi threaded
- Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads.
- Executor lives for the length of the whole application (1-1).
- A Job consistens of multiple tasks spawned in response to a Spark action
- The cluster manager manages all these workers, yarn or mesos could do this for you
- The driver is where your code lives, you can submit a packaged application or use the REPL
RDDs
- Immutable
- Each transformation will create a new RDD
- Lazy
- A DAG (directed acyclic graph) of computation is constructed
- The actual data is processed only when an action is invoked
- Reusable
- Can re-use RDDs by executing them multiple times
- An RDD is an immutable distributed collection of objects that can be operated on in parallel
- RDDs are the workhorse of Spark, everything is done using RDDs.
- Everything is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result
- Users create RDDs in three ways: from an RDD, by loading an external dataset, or by distributing a collection of objects (e.g., a list or set) in their driver program, not done in production as the whole dataset must be in memory
- Persistent in memory between operations
RDD Partitions
- RDD is divided into partitions which are districuted across the cluster
- Partitions never span multiple machines, i.e. tuples in the same partition are guaranteed to be on the same machine.
- The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes.
RDD Tasks
- One task per partition (on a per stage basis)
- task is smallest unit of computation
- partition is smallest unit of data
RDD Operations
RDD operations are split into two distinct categories
-
Transformations
- Returns a new RDD which will apply the transformation
- Can merge multiple RDDs into a new RDD (union)
- map, filter, flatMap, mapPartitions, join
-
Actions
- Force evaluation of the transformations
- Return a final value to the driver program or write data to an external storage system.
- reduce, collect, count, saveAs**, foreach
- No substantial benefit to writing a single complex map instead of chaining together many simple operations.
- Each operation can be executed on a different node
- There are different types of RDDs, for example Numeric RDDS and PairRDDS... Each has some additional operations available. mean for example
- PairRDDs are really powerful and can be used to control the partitioning of data by key, reducing shuffling of data. Each key can then executed in parallel across nodes
- Using PairRDDs lets you perform operations just on values, much more efficient as the data will be on the same partition
RDD Lineage
- Each operation on an RDD creates a new RDD, with the previous operation as part of it's history.
- A lost RDD partition is reconstructed from ancestors
- By default the whole RDD lineage is executed when an action is invoked
- We can avoid this by caching (persisting) at various stages
- RDDs know their parents, which also know their parents...
- As you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost.
Basic Word Count
First add the imports and create the SparkContext
import org.apache.spark.{SparkConf, SparkContext}
//Create a conf, running on two local cores
val conf = new SparkConf()
.setAppName("Simple Application")
//* will use all cores
.setMaster("local[2]")
val sc = new SparkContext(conf)
Basic Word Count (cont...)
We can now create an RDD from our SparkContext, we can then transform the RDD and execute it
val input: RDD[String] = sc.textFile("words.txt")
// Split up into words.
val words = input.flatMap(line => line.split(" "))
val counts = words
//pair each word with 1
.map(word => (word, 1))
//combine all matching words
.reduceByKey{case (x, y) => x + y}
// Save the word count back out to a text file.
counts.saveAsTextFile("counts.txt")
Basic Word Count (cont...)
The result is a file containing the following
...
(daughter,1)
(means,1)
(this,2)
(brother,1)
(possession,1)
(is,1)
(term,1)
...
Debugging Word Count
toDebugString lets us print out the RDD lineage for debugging
println(counts.toDebugString)
(2) ShuffledRDD[4] at reduceByKey at BasicWordCount.scala:25 []
+-(2) MapPartitionsRDD[3] at map at BasicWordCount.scala:24 []
| MapPartitionsRDD[2] at flatMap at BasicWordCount.scala:19 []
| MapPartitionsRDD[1] at textFile at BasicWordCount.scala:14 []
| words.txt HadoopRDD[0] at textFile at BasicWordCount.scala:14 []
Monitoring
-
http://localhost:4040
- Spark’s “stages” job console. Started by the SparkContext.
-
http://master_host_name:4040
- For Spark Standalone clusters, the Spark Master.
-
http://slave_host_name:7077
We can view all jobs since the SparkContext was started
Clicking on a job displays more fine grained details
The lineage graph is really useful for debugging
Spark Streaming Intro
- High-throughput streaming from live events
- Run event based computations and update data in real-time
- Data can be ingested by RECEIVERS from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.
- Can have reliable and unreliable receivers
- Can store raw data or aggregated (processed) data... or both!
Spark Streaming Intro (cont...)
- Data is grouped into batch windows for processing
- The "batch interval" is configurable
- Spark provides a high level abstraction called a discretized stream or DStream
- Spark receives live input data streams and divides the data into micro batches, which are then processed by the Spark engine to generate the final stream of results in batches
- Micro-batches of data are created at regular time intervals.
- This stream of micro batches is known as a DStream
- DStreams created from input streams or other DStreams
DStreams
- A DStream is a continuous sequence of RDDs
- Each micro batch of data is an RDD
- Each RDD has lineage and fault tolerance
- Transformations similar to those on normal RDDs are applicable
- There are many additional transformations and output operations that are only applicable to discretized streams
- Any operation applied on a DStream translates to operations on the underlying RDDs
- Batch interval or time step is used to configure this, typically between 500ms and several seconds
- Transformations are called on a DStream and applied to each RDD individually
- Both stateless and stateful transformations
- Stateful transformaitons can be used to keep counts or process aggregations on the fly
- Although aggregating in something like Cassandra is recommended instead
-
Basic Stream Printer
First lets add the imports and create a StreamingContext
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
val conf = new SparkConf()
.setAppName("Streaming Word Count")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
val context = ssc.sparkContext
We must have more cores available than receivers.
Define the input sources by creating input DStreams.
Define operations to be applied to the batches
Start receiving data and processing it using streamingContext.start()
Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination()
The processing can be manually stopped using streamingContext.stop()
Basic Stream Printer (cont...)
Next we need to define the receiver, here we're using a queue
//We're using a mutable queue with 3 items
val rddQueue: Queue[RDD[String]] = Queue()
rddQueue += context.textFile("words.txt")
rddQueue += context.textFile("words2.txt")
rddQueue += context.textFile("words3.txt")
- The queue has just 3 items, we could replace this with a socketTextStream, kafka, twitter, etc. But this makes it easy to run the examples
Basic Stream Printer (cont...)
Finally we can read, parse and start the stream
val streamBatches: InputDStream[String] =
ssc.queueStream(rddQueue, oneAtATime = true)
val words = streamBatches.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
//print the first 10 elements of each batch
wordCounts.print()
//Start the stream and wait for it to terminate
ssc.start()
ssc.awaitTermination()
- the queue acts as the source receiver, queueStream removes one item each time, so 3 batches in total
- Once a context has been started, no new streaming computations can be set up or added to it.
- Once a context has been stopped, it cannot be restarted.
- Only one StreamingContext can be active in a JVM at the same time.
- flatMap will split each line into multiple words and the stream of words is represented as the words DStream
Basic Stream Printer output
-------------------------------------------
Time: 1452128900000 ms
-------------------------------------------
(daughter,1)
(brother,4)
...
-------------------------------------------
Time: 1452128901000 ms
-------------------------------------------
(too,,1)
(Gauls,3)
...
...
- The output of the current batch is printed each time
Stateful Stream Printer
Lets extend this example by persisting the state between batches
val ssc = new StreamingContext(conf, Seconds(1))
// We need somewhere to store the state
// So we create a checkout directory
ssc.checkpoint("./checkpoint")
//Create a stream using a Dummy Receiver
val stream: ReceiverInputDStream[String] =
ssc.receiverStream(
new DummySource(ratePerSec = 1))
// Parse the stream words and pair them up
val wordDStream =
stream.flatMap(_.split(" ")).map(x => (x, 1))
- The DummySource is a custom receiver which returns the same set of words each time, in this case once per second
- Notice we're not reducing them here
Stateful Stream Printer (cont...)
We need to define a function to combine batches
def stateMappingFunc(batchTime: Time, key: String,
value: Option[Int], state: State[Long]
): Option[(String, Long)] = {
val currentVal = value.getOrElse(0).toLong
val aggVal = state.getOption.getOrElse(0L)
val sum = currentVal + aggVal
val output = (key, sum)
state.update(sum)
Some(output)
}
val stateSpec = StateSpec
.function(stateMappingFunc _)
.numPartitions(2)
.timeout(Seconds(60))
- For mapWithState need to define a function which can combine the state of two DStreams
- This is used to update the state of the previous batch with that of the new one
- The function must accept the following params:
- The current Batch Time
- The key for which the state needs to be updated
- The value observed at the 'Batch Time' for the key.
- The current state for the key.
- Return: the new (key, value) pair where value has the updated state information
- The state Spec wraps the function and allows us to define properties such as initialState and partitions
Stateful Stream Printer (cont...)
Finally merge the states using the function and start the stream
// apply the function returning a merged stream
val wordCountStateStream =
wordDStream.mapWithState(stateSpec)
//Print the first 10 records of the stream
wordCountStateStream.print()
ssc.start()
ssc.awaitTermination()
- Since we merge the streams based on keys, this stream will contain the same # of records as the input dstream
Stateful Stream Printer Output
Time: 1452011283000 ms
-------------------------------------------
(buy,2)
(as,7)
(as,8)
(burden,2)
....
-------------------------------------------
Time: 1452011284000 ms
-------------------------------------------
(buy,3)
(as,12)
(as,13)
(burden,3)
....
- This time the output is cumulative, incrementing words between batches
Windowed DStreams
- Can compute results based on data in multiple batches
- Known as window(ed) transformations
- Carried over multiple batches in a sliding window
- Specify the window length and sliding interval
-
window length - The duration of the window (3 in the figure).
-
sliding interval - The interval at which it's performed (2 in the figure).
- Requires a checkpoint directory for stateful transformations
- For fault tolerance
- The configuration allows you to define the overlap for stateful transformations
- Every time the window slides over a source DStream, the source RDDs that fall within the window are combined and operated upon to produce the RDDs of the windowed DStream.
Twitter Hashtag Counter
This time we're consuming from the live Twitter API
//Receive all tweets unfiltered
val stream: ReceiverInputDStream[Status] =
TwitterUtils.createStream(ssc, None)
//Split tweets into words
val words = stream.flatMap(tweet =>
tweet.getText.toLowerCase.split(" "))
//Pull out the hashtags
val hashtags = words
.filter(word => word.startsWith("#"))
- Each item in the stream is an object, containing various Tweet properties, time, user, retweet, etc
- You can pass filters to the createStream
- We're not doing anything fancy with re-tweets etc
Twitter Hashtag Counter
We want to count all tweets in the last 60 seconds
val tagCountsOverWindow = hashtags
.map((_, 1))
//60 second window
.reduceByKeyAndWindow(_ + _, Seconds(60))
val sortedTagCounts = tagCountsOverWindow
//Swap the key and value to make sorting easier
.map { case (hashtag, count) => (count, hashtag) }
.transform(_.sortByKey(false))
//Print the Top 10
sortedTagCounts.print()
- This requires shuffling data as we're swapping the keys around
- transform lets us apply RDD specific functions that DStreams don't expose
Twitter Hashtag Counter Output
-------------------------------------------
Time: 1452124409000 ms
-------------------------------------------
(7,#eurekamag)
(7,#wastehistime2016)
(6,#aldub25thweeksary)
(6,#videomtv2015)
...
-------------------------------------------
Time: 1452124411000 ms
-------------------------------------------
(8,#wastehistime2016)
(7,#eurekamag)
(6,#aldub25thweeksary)
(6,#videomtv2015)
...
Spark on Cassandra
- Specifically on DSE which makes deployment much easier
- A Spark worker is spawned which is responsible for the executors.
- This has it’s own memory allocation which isolates it from Cassandra
- Each worker node sits on an analytics only Cassandra DSE instance with an analytics data centre
Deployment on Cassandra
- Spark specific data center
- Spark worker living on each Cassandra node
- Prevents Spark interferring with norml queries
- Data is replicated by Cassandra between the data centres, both read+write
- Configured using the dse.yaml
Spark Cassandra Connector
- Lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute CQL queries in your Spark applications
- Open sourced by Datastax, not DSE specific
- Spark partitions are constructed from data stored by Cassandra on the same node
- Partitions are not computed until an action is seen
- Shuffling is expensive, so it's important to get partitioning correct
- Some operations explicitly shuffle data
- Allows complex queries within a partition, although Cassandra getting more and more built in functionality to do this
- Can do table wide scans, thought slower than HDFS
- Can use Spark to create agg tables, effectively materialised views
- Cassandra 3 has materialised views
Cassandra Aware Partitioning
The Cassandra connector uses Cassandra partition information to find and process data on local nodes
Nodes in the Cassandra Cluster own part of the token range
Spark connector keeps track of where all these live
These ranges are then mapped to Spark partitions
The connector uses this information to select Spark nodes local to the Cassandra data
- We can repartition datasets to better suit our application and data
- The rdd is on the same node as the node who hold that cassandra partition key
Cassandra Aware Partitioning (cont...)
- Nodes in the Cassandra Cluster own part of the token range
- Spark connector keeps track of where all these live
- These ranges are then mapped to Spark partitions
- So each Spark partition knows which C* node it's data is on
- The Spark driver & executor can therefore assign the task to the appropriate node
- Processing local data is faster!!
- Lets ignore vnodes for simplicity
- Uses spark.cassandra.split.size to determine how to break up the ranges into these pieces
- This is an estimate as it doesn’t read all the data
- As in it knows which Cassandra node the data is on
- Depends on the RDD being partition in a similar way to the Cassandra table
- spark.locality.wait property defines how long to wait for the preferred partition/node
- data is paged out according to the spark.cassandra.input.page.row.size property
Cassandra Stream Word Count
First we need the appropriate imports and create the context
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.streaming._
import com.datastax.spark.connector._
val conf = new SparkConf()
.setAppName("Twitter HashTag Counter")
.setMaster("local[2]")
//We need to specify the Cassandra host
.set("spark.cassandra.connection.host",
"127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(1))
- Note the cassandra configuration
-
Cassandra Stream Word Count (cont...)
Next we parse the and filter the tweets
//Receive all tweets unfiltered
val stream: ReceiverInputDStream[Status] =
TwitterUtils.createStream(ssc, None)
//Split tweets into words
val words = stream.flatMap(tweet =>
tweet.getText.toLowerCase.split(" "))
//Pull out the hashtags and map them to a count
val hashtags = words.filter(word =>
word.startsWith("#"))
Cassandra Stream Word Count (cont...)
We can now count the words and write the data to Cassandra
//We need to define a case class
case class HashTagCount(
hashtag: String, count: Long)
hashtags
//This is exactly the same as the map -> reduce
.countByValue()
//map to the case class
.map(rdd => HashTagCount(rdd._1, rdd._2))
//save the stream to Cassandra
.saveToCassandra("spark", "wordcount")
- We're saving the case class to cassandra, incrementing the counter
- Note that the data is not ordered here
Cassandra Stream Word Count output