Architectural challenges of IoT and Big Data – Architectures – Spark Streaming Intro



Architectural challenges of IoT and Big Data – Architectures – Spark Streaming Intro

0 0


BigData-IoT-Presentation-Slides


On Github markglh / BigData-IoT-Presentation-Slides

Architectural challenges of IoT and Big Data

Plus a dose of Spark for good measure :)

Created by Mark Harrison / @markglh

www.cakesolutions.net

  • IoT is nothing new
  • It's just a buzzword used to describe the connection of remote "things"
  • The key idea is all those devices collect and exchange data over the internet
  • Everyone wants a piece of IoT
  • Often when they don't have any real need
  • No one size fits all solution
  • There are platforms that attempt to cover a wide area but they're sitll fairly proprietary
  • No connectivity standards across devices
  • Bluetooth, zigbee, z-wave, 6LowPAN, Thread, WiFi, NFC, Sigfox, Neul, LoRaWAN
  • Because different devices have different use-cases!!
  • Although some don't and should be standardised, samsung, lg, etc :|

So what is IoT?

  • The Internet of Things!
  • Numerous protocols for communication
  • Everything is connected, from RFID to Bluetooth to WiFi and everything in between
  • How can we leverage this in new applications?
    • We want to take advantage of all this available data

Many Devices

  • Tens of Billions and counting
  • Devices often very “thin"
    • Lack processing power, memory and battery life
  • Bandwidth Concerns
    • Data ain't cheap!!
  • Need to process as much as possible on the server
  • Easier to manage if more processing is on the server
  • Sometimes we HAVE to process things on the device

What's so difficult?

  • Scaling out
    • Much more difficult than scaling up
    • This requires a different architecture than a typical web application
    • Many different devices, different data streams, different information
    • We don't want a different core application per device type
  • What to do with all this data?!?
    • Big data needs big processing!
    • Replaying it for reprocessing
  • scaling out rather than up
  • Privacy and Security concerns
  • Complexities of interoperability of devices
    • Protocols, Interfaces, Algorithms and Discovery
    • Standards are (very slowly) emerging
  • Intermittent connectivity

Many Emerging Architectures and Stacks

  • Lambda
  • Kappa
  • SMACK
  • Zeta
  • IoT-a
  • Greek letters are all the rage

Architectures

So what does this look like?

First... CAP Theorem

So what does this look like?

  • Basically a choice between Consistency and Partition tolerance
    • If you want scalability

First... CAP Theorem (cont...)

  • Consistency
    • Across different partitions/nodes. Every client has the same view.
    • Immediate not Eventual
  • Availability
    • The system is always available, responses are guaranteed.
    • Your queries are always executed.
  • Partition Tolerance
    • No single point of failure
    • Can handle a node disappearing
    • Replication across nodes
  • Can only guarantee two of the three database properties
  • Basically a choice between Consistency and Partition tolerance
    • If you want scalability

Lambda Architecture

  • An immutable sequence of records is captured and fed in
  • Queries are executed upon a combination of the realtime and batch layers
  • Events are persisted to be replayed, either using kafka on input or to a historical database

Lambda Architecture (cont...)

  • Originally described by Nathan Marz, creator of Apache Storm
  • Claims to beat CAP theorem
  • Splits the application into three parts
    • Batch and Realtime layers for ingesting data
    • Serving layer for querying data
  • Events (incoming data) are time based
    • Immutable, append only
    • Series of Commands
    • No updates or deletes
    • Can replay the data
  • Duplication between layers
  • Queries are executed upon a combination of the realtime and batch layers

Lambda Architecture - Realtime Layer

  • Realtime layer handles new data
  • As accurate as possible, however the batch layer gives “the final answer"
  • CAP complexity is isolated into this layer, which now only applies to the latest subset of data
  • No need to worry about corrupting the data
  • Realtime data not yet incorporated into the batch layer
  • Data replayed in batch layer

Lambda Architecture - Batch Layer

  • Batch layer handles historical data
  • Queries in the realtime layer are recomputed after the data is persisted to the batch layer
  • Allows issues in the realtime layer to be corrected
  • More complex and involved computations, not time critical
  • Replaying data after fixing bugs etc
  • Data replayed in batch layer

Lambda Architecture - Serving Layer

  • The two “views” are merged together to give a final answer by the serving layer
    • Batch layer is indexed for querying
  • Many different options here depending on query use cases
  • Table/View per query?
  • There's no standard way of implementing this layer
  • Cassandra, HBase, Apache Drill, VoltDB
  • We're just presenting aggregated views for fast querying
  • Could use Akka persistence and hold the latest state in memory

Lambda Architecture - The good stuff

  • Fault Tolerant against both hardware and humans
  • Stores all input data unchanged, allowing for reprocessing
    • We do this in our Muvr application to re-categorise exercises with new models
  • Allows for a realtime view of the data, plus a more accurate higher latency view
  • Clean separation of concerns
  • If done correctly gives fast reads and fast writes
  • Scalable

Lambda Architecture - The bad stuff

  • Assumes accurate results can’t be computed real-time
  • Duplicates the transformation and computation logic (code) in both layers
  • More complex to debug as a result of the two discrete implementations
  • Data still not immediately consistent, hence doesn’t beat CAP theorem
  • Duplication is a BIG issue here
  • Frameworks such as summingbird attempt to address this but aren't without their issues and complexities
  • Can be complex creating the required combined views

Kappa Architecture

Kappa Architecture (cont...)

  • Coined by Jay Creps, formally of LinkedIn
  • Simplifies the Lambda architecture by removing the batch layer
  • Streams are capable of persisting the historical data
    • For example Kafka and Event Sourcing
  • The assumption is that stream processing is powerful enough to process and transform the data real-time
  • Processed data can then be stored in a table for querying
  • Sometimes we can't process the data real time
  • We can still archive data for analytics jobs

Kappa Architecture - Reprocessing

Start a second instance of the stream processor Replay the data from the desired point in time Output to a new table Once complete, stop the original stream processor Processed data can then be stored in a table for querying
  • Delete the old table
  • Can be done in a live application
  • The original stream is the unaltered data
  • Can always be relied upon as it's unaltered

Kappa Architecture - The good stuff

  • Fault Tolerant against both hardware and humans
  • No need to maintain two separate code bases for the layers
  • Data only reprocessed when stream processing code has changed
    • Potentially reduced hosting costs
  • Many of the same advantages as the Lambda architecture
  • Clean separation of concerns
  • Still need to create aggregated views for fast querying
  • Depends on use case of course
    • Posting the results vs making them available for querying

Kappa Architecture - The bad stuff

  • May be difficult / impossible to perform the computation on a large enough dataset
    • Stream window or stateful datasource / cache
    • Could introduce latency
  • Often you’ll need to decide on a small “window” of historical data
  • Might not be sufficient for complex Machine Learning
  • Would still need to train models for machine learning, which means batch layer is escence

Quick summary

  • Streams eveywhere!!
  • Separating history from current (aggregated) data
    • Denormalised output data
    • Input format doesn’t need to match output format
    • Immutability means Non-destructive writes
  • Not new ideas, just a different way of thinking about them
  • Enforce a company wide data format for messages early on
  • High throughput important
  • Some ideas of ESBs apply here, better decoupling
  • AVRO (Schemas)
  • Schema ensures data correctness, improves clarity and understanding, improves compatibility
  • Loose coupling
  • Dealing with Schema evolution
  • Dealing with corrupt Schemas
  • Can use DB in combination with journal for historical data

Muvr use case

  • Originally used classic Lambda
  • Akka cluster for speed layer
    • One Actor per user
    • CQRS and (Akka) Event Sourcing
  • Spark (ML) batch layer
    • Trains the classifiers
  • Explain what Muvr is!
  • Great in theory!

Muvr use case - revised

  • Simply no good in production!
  • Latency too high
    • Too sensitive to network
    • Required always on connectivity
  • We had to move the speed layer onto the device
    • Or rather split it between server and device
  • It's designed to be used in a gym!!

S.M.A.C.K

  • These technologies are being used together more and more

S.M.A.C.K

  • Spark
    • Fast, distributed data processing and analytics engine
  • Mesos
    • Cluster resource management solution providing resource isolation and sharing across distributed systems
  • Akka
    • Actor based JVM framework for building highly scalable, concurrent distributed message-driven applications
  • Cassandra
    • Highly available, distributed, scalable database designed to handle huge amounts of data across multiple data centers
  • Kafka
    • A high-throughput, low-latency distributed stream based messaging system designed to handle and persist events
  • Key to all these fancy architectures
  • becoming the defacto data processing engine

Apache Spark

Apache Spark™ is a fast and general purpose engine for large-scale data processing, with built-in modules for streaming, SQL, machine learning and graph processing
  • Originally developed by Berkeley’s AMP Lab in 2009
  • Open sourced as part of Berkeley Data Analytics Stack (BDAS) in 2010
  • Top level apache project since 2014
  • Spark provides rich APIs in Java, Scala, Python, and R
  • The fundamental programming abstraction in Spark is the Resilient Distributed Dataset (RDD)
  • can be have crazy performance improvements over vanilla MapReduce
  • Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3
  • It is NOT a database

Who uses Spark

Why Spark

  • Contributers from over 200 companies
  • One of the most active open source projects
  • IBM will invest roughly $300 million over the next few years and assign 3500 people to help develop Spark

Why Spark (cont...)

  • Easy to develop
    • Flexible, composable programming model
    • Provides Spark Shell
    • APIs for Scala, Java and Python
  • Fast and Scalable!
    • Optimised storage between memory and disk
    • Scales from a single laptop to a large cluster
    • Up to 10x-100x faster than Hadoop
  • Feature Rich
    • Supports Event Streaming Applications
    • Efficient support for Machine Learning
    • Modular architecture
  • Lazily evaluates this so that...
  • Spark can work out the most efficient way to do things

Spark usage scenarios

  • On demand querying of large datasets
  • Batch processing
  • Machine Learning
  • Stream Processing
  • Data Transformations
  • Graph processing
  • Modular architecture makes it easy to extend functionality
  • Spark needs data from somewhere, it's not a database
  • So going back to previous slides, spark can run as the batch layer, running analytics which can them be output to tables (lambda)
  • Or we can run a Spark Streaming job which can perform smaller calculations on the fly (Kappa) - on the fly
  • Could actually re-use a lot of the code between layers

Interactive Shells

  • spark-shell
    • Extended Scala REPL with Spark imports already in scope
  • spark-submit
    • Used to submit jobs (JARs) to the cluster
  • spark-sql
    • SQL REPL
  • pyspark
    • Python shell
  • Shells make programming so much easier

Spark Stack

  • Architecture allows new modules to be added
  • Spark Core is the underlying general execution engine for the Spark platform that all other functionality is built on top of. It provides in-memory computing capabilities to deliver speed, a generalized execution model to support a wide variety of applications, and Java, Scala, and Python APIs for ease of development.

Spark Jobs

Directed Acyclic Graph
  • The driver breaks the RDD graph down into jobs
  • Each job is composed of stages
  • Stages get implemented by one or more tasks
  • Tasks are sent to the executors to perform the work

Spark Cluster Overview

Driver connects to manager. Asks it for resources Manager assigns nodes, allocates resources Executor started on each assigned node, isolated from other applications Tasks sent to executors and started on nodes - multi threaded
  • Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads.
  • Executor lives for the length of the whole application (1-1).
  • A Job consistens of multiple tasks spawned in response to a Spark action
  • The cluster manager manages all these workers, yarn or mesos could do this for you
  • The driver is where your code lives, you can submit a packaged application or use the REPL

RDDs

  • Immutable
    • Each transformation will create a new RDD
  • Lazy
    • A DAG (directed acyclic graph) of computation is constructed
    • The actual data is processed only when an action is invoked
  • Reusable
    • Can re-use RDDs by executing them multiple times
  • An RDD is an immutable distributed collection of objects that can be operated on in parallel
  • RDDs are the workhorse of Spark, everything is done using RDDs.
  • Everything is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result
  • Users create RDDs in three ways: from an RDD, by loading an external dataset, or by distributing a collection of objects (e.g., a list or set) in their driver program, not done in production as the whole dataset must be in memory
  • Persistent in memory between operations

RDD Partitions

  • RDD is divided into partitions which are districuted across the cluster
  • Partitions never span multiple machines, i.e. tuples in the same partition are guaranteed to be on the same machine.
  • The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes.

RDD Tasks

  • One task per partition (on a per stage basis)
  • task is smallest unit of computation
  • partition is smallest unit of data

RDD Operations

RDD operations are split into two distinct categories

  • Transformations
    • Returns a new RDD which will apply the transformation
    • Can merge multiple RDDs into a new RDD (union)
    • map, filter, flatMap, mapPartitions, join
  • Actions
    • Force evaluation of the transformations
    • Return a final value to the driver program or write data to an external storage system.
    • reduce, collect, count, saveAs**, foreach
  • No substantial benefit to writing a single complex map instead of chaining together many simple operations.
  • Each operation can be executed on a different node
  • There are different types of RDDs, for example Numeric RDDS and PairRDDS... Each has some additional operations available. mean for example
  • PairRDDs are really powerful and can be used to control the partitioning of data by key, reducing shuffling of data. Each key can then executed in parallel across nodes
  • Using PairRDDs lets you perform operations just on values, much more efficient as the data will be on the same partition

RDD Lineage

  • Each operation on an RDD creates a new RDD, with the previous operation as part of it's history.
  • A lost RDD partition is reconstructed from ancestors
  • By default the whole RDD lineage is executed when an action is invoked
    • We can avoid this by caching (persisting) at various stages
  • RDDs know their parents, which also know their parents...
  • As you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost.

Basic Word Count

First add the imports and create the SparkContext
import org.apache.spark.{SparkConf, SparkContext}

//Create a conf, running on two local cores
val conf  = new SparkConf()
   .setAppName("Simple Application")
   //* will use all cores
   .setMaster("local[2]")

val sc = new SparkContext(conf)

Basic Word Count (cont...)

We can now create an RDD from our SparkContext, we can then transform the RDD and execute it
val input: RDD[String] = sc.textFile("words.txt")
// Split up into words.
val words = input.flatMap(line => line.split(" "))

val counts = words
   //pair each word with 1
   .map(word => (word, 1))
   //combine all matching words
   .reduceByKey{case (x, y) => x + y}

// Save the word count back out to a text file.
counts.saveAsTextFile("counts.txt")

Basic Word Count (cont...)

The result is a file containing the following
...
(daughter,1)
(means,1)
(this,2)
(brother,1)
(possession,1)
(is,1)
(term,1)
...

Debugging Word Count

toDebugString lets us print out the RDD lineage for debugging
println(counts.toDebugString)

(2) ShuffledRDD[4] at reduceByKey at BasicWordCount.scala:25 []
 +-(2) MapPartitionsRDD[3] at map at BasicWordCount.scala:24 []
    |  MapPartitionsRDD[2] at flatMap at BasicWordCount.scala:19 []
    |  MapPartitionsRDD[1] at textFile at BasicWordCount.scala:14 []
    |  words.txt HadoopRDD[0] at textFile at BasicWordCount.scala:14 []

Monitoring

  • http://localhost:4040
    • Spark’s “stages” job console. Started by the SparkContext.
  • http://master_host_name:4040
    • For Spark Standalone clusters, the Spark Master.
  • http://slave_host_name:7077
    • For Spark slave nodes
We can view all jobs since the SparkContext was started
Clicking on a job displays more fine grained details
The lineage graph is really useful for debugging

Spark Streaming Intro

  • High-throughput streaming from live events
  • Run event based computations and update data in real-time
  • Data can be ingested by RECEIVERS from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.
  • Can have reliable and unreliable receivers
  • Can store raw data or aggregated (processed) data... or both!

Spark Streaming Intro (cont...)

  • Data is grouped into batch windows for processing
  • The "batch interval" is configurable
  • Spark provides a high level abstraction called a discretized stream or DStream
  • Spark receives live input data streams and divides the data into micro batches, which are then processed by the Spark engine to generate the final stream of results in batches
  • Micro-batches of data are created at regular time intervals.
  • This stream of micro batches is known as a DStream
  • DStreams created from input streams or other DStreams

DStreams

  • A DStream is a continuous sequence of RDDs
    • Each micro batch of data is an RDD
  • Each RDD has lineage and fault tolerance
  • Transformations similar to those on normal RDDs are applicable
  • There are many additional transformations and output operations that are only applicable to discretized streams
  • Any operation applied on a DStream translates to operations on the underlying RDDs
  • Batch interval or time step is used to configure this, typically between 500ms and several seconds
  • Transformations are called on a DStream and applied to each RDD individually
  • Both stateless and stateful transformations
  • Stateful transformaitons can be used to keep counts or process aggregations on the fly
  • Although aggregating in something like Cassandra is recommended instead

Basic Stream Printer

First lets add the imports and create a StreamingContext
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._

val conf = new SparkConf()
   .setAppName("Streaming Word Count")
   .setMaster("local[2]")

val ssc = new StreamingContext(conf, Seconds(1))
val context = ssc.sparkContext
We must have more cores available than receivers. Define the input sources by creating input DStreams. Define operations to be applied to the batches Start receiving data and processing it using streamingContext.start() Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination() The processing can be manually stopped using streamingContext.stop()

Basic Stream Printer (cont...)

Next we need to define the receiver, here we're using a queue
//We're using a mutable queue with 3 items
val rddQueue: Queue[RDD[String]] = Queue()
rddQueue += context.textFile("words.txt")
rddQueue += context.textFile("words2.txt")
rddQueue += context.textFile("words3.txt")
  • The queue has just 3 items, we could replace this with a socketTextStream, kafka, twitter, etc. But this makes it easy to run the examples

Basic Stream Printer (cont...)

Finally we can read, parse and start the stream
val streamBatches: InputDStream[String] = 
   ssc.queueStream(rddQueue, oneAtATime = true)

val words = streamBatches.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

//print the first 10 elements of each batch
wordCounts.print()

//Start the stream and wait for it to terminate
ssc.start()
ssc.awaitTermination()
  • the queue acts as the source receiver, queueStream removes one item each time, so 3 batches in total
  • Once a context has been started, no new streaming computations can be set up or added to it.
  • Once a context has been stopped, it cannot be restarted.
  • Only one StreamingContext can be active in a JVM at the same time.
  • flatMap will split each line into multiple words and the stream of words is represented as the words DStream

Basic Stream Printer output

-------------------------------------------
Time: 1452128900000 ms
-------------------------------------------
(daughter,1)
(brother,4)
...

-------------------------------------------
Time: 1452128901000 ms
-------------------------------------------
(too,,1)
(Gauls,3)
...
...
  • The output of the current batch is printed each time

Stateful Stream Printer

Lets extend this example by persisting the state between batches
val ssc = new StreamingContext(conf, Seconds(1))
// We need somewhere to store the state
// So we create a checkout directory
ssc.checkpoint("./checkpoint") 
//Create a stream using a Dummy Receiver
val stream: ReceiverInputDStream[String] = 
   ssc.receiverStream(
      new DummySource(ratePerSec = 1))

// Parse the stream words and pair them up
val wordDStream = 
   stream.flatMap(_.split(" ")).map(x => (x, 1))
  • The DummySource is a custom receiver which returns the same set of words each time, in this case once per second
  • Notice we're not reducing them here

Stateful Stream Printer (cont...)

We need to define a function to combine batches
def stateMappingFunc(batchTime: Time, key: String,
    value: Option[Int], state: State[Long]
    ): Option[(String, Long)] = {
  val currentVal = value.getOrElse(0).toLong
  val aggVal = state.getOption.getOrElse(0L)
  val sum = currentVal + aggVal
  val output = (key, sum)
  state.update(sum)
  Some(output)
}
val stateSpec = StateSpec
  .function(stateMappingFunc _)
  .numPartitions(2)
  .timeout(Seconds(60))
  • For mapWithState need to define a function which can combine the state of two DStreams
  • This is used to update the state of the previous batch with that of the new one
  • The function must accept the following params:
  • The current Batch Time
  • The key for which the state needs to be updated
  • The value observed at the 'Batch Time' for the key.
  • The current state for the key.
  • Return: the new (key, value) pair where value has the updated state information
  • The state Spec wraps the function and allows us to define properties such as initialState and partitions

Stateful Stream Printer (cont...)

Finally merge the states using the function and start the stream
// apply the function returning a merged stream
val wordCountStateStream = 
   wordDStream.mapWithState(stateSpec)

//Print the first 10 records of the stream
wordCountStateStream.print()

ssc.start()
ssc.awaitTermination()
  • Since we merge the streams based on keys, this stream will contain the same # of records as the input dstream

Stateful Stream Printer Output

Time: 1452011283000 ms
-------------------------------------------
(buy,2)
(as,7)
(as,8)
(burden,2)
....
-------------------------------------------
Time: 1452011284000 ms
-------------------------------------------
(buy,3)
(as,12)
(as,13)
(burden,3)
....
  • This time the output is cumulative, incrementing words between batches

Windowed DStreams

  • Can compute results based on data in multiple batches
  • Known as window(ed) transformations
  • Carried over multiple batches in a sliding window
  • Specify the window length and sliding interval
  • window length - The duration of the window (3 in the figure).
  • sliding interval - The interval at which it's performed (2 in the figure).
  • Requires a checkpoint directory for stateful transformations
  • For fault tolerance
  • The configuration allows you to define the overlap for stateful transformations
  • Every time the window slides over a source DStream, the source RDDs that fall within the window are combined and operated upon to produce the RDDs of the windowed DStream.

Twitter Hashtag Counter

This time we're consuming from the live Twitter API
//Receive all tweets unfiltered
val stream: ReceiverInputDStream[Status] = 
   TwitterUtils.createStream(ssc, None)

//Split tweets into words
val words = stream.flatMap(tweet => 
   tweet.getText.toLowerCase.split(" "))

//Pull out the hashtags
val hashtags = words
   .filter(word => word.startsWith("#"))
  • Each item in the stream is an object, containing various Tweet properties, time, user, retweet, etc
  • You can pass filters to the createStream
  • We're not doing anything fancy with re-tweets etc

Twitter Hashtag Counter

We want to count all tweets in the last 60 seconds
val tagCountsOverWindow = hashtags
   .map((_, 1))
   //60 second window
   .reduceByKeyAndWindow(_ + _, Seconds(60))

val sortedTagCounts = tagCountsOverWindow
   //Swap the key and value to make sorting easier
   .map { case (hashtag, count) => (count, hashtag) }
   .transform(_.sortByKey(false))

//Print the Top 10
sortedTagCounts.print()
  • This requires shuffling data as we're swapping the keys around
  • transform lets us apply RDD specific functions that DStreams don't expose

Twitter Hashtag Counter Output

-------------------------------------------
Time: 1452124409000 ms
-------------------------------------------
(7,#eurekamag)
(7,#wastehistime2016)
(6,#aldub25thweeksary)
(6,#videomtv2015)
...
-------------------------------------------
Time: 1452124411000 ms
-------------------------------------------
(8,#wastehistime2016)
(7,#eurekamag)
(6,#aldub25thweeksary)
(6,#videomtv2015)
...

Spark on Cassandra

  • Specifically on DSE which makes deployment much easier
  • A Spark worker is spawned which is responsible for the executors.
  • This has it’s own memory allocation which isolates it from Cassandra
  • Each worker node sits on an analytics only Cassandra DSE instance with an analytics data centre

Deployment on Cassandra

  • Spark specific data center
  • Spark worker living on each Cassandra node
  • Prevents Spark interferring with norml queries
  • Data is replicated by Cassandra between the data centres, both read+write
  • Configured using the dse.yaml

Spark Cassandra Connector

  • Lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute CQL queries in your Spark applications
  • Open sourced by Datastax, not DSE specific
  • Spark partitions are constructed from data stored by Cassandra on the same node
  • Partitions are not computed until an action is seen
  • Shuffling is expensive, so it's important to get partitioning correct
  • Some operations explicitly shuffle data
  • Allows complex queries within a partition, although Cassandra getting more and more built in functionality to do this
  • Can do table wide scans, thought slower than HDFS
  • Can use Spark to create agg tables, effectively materialised views
  • Cassandra 3 has materialised views

Cassandra Aware Partitioning

The Cassandra connector uses Cassandra partition information to find and process data on local nodes Nodes in the Cassandra Cluster own part of the token range Spark connector keeps track of where all these live These ranges are then mapped to Spark partitions The connector uses this information to select Spark nodes local to the Cassandra data
  • We can repartition datasets to better suit our application and data
  • The rdd is on the same node as the node who hold that cassandra partition key

Cassandra Aware Partitioning (cont...)

  • Nodes in the Cassandra Cluster own part of the token range
  • Spark connector keeps track of where all these live
  • These ranges are then mapped to Spark partitions
  • So each Spark partition knows which C* node it's data is on
  • The Spark driver & executor can therefore assign the task to the appropriate node
  • Processing local data is faster!!
  • Lets ignore vnodes for simplicity
  • Uses spark.cassandra.split.size to determine how to break up the ranges into these pieces
  • This is an estimate as it doesn’t read all the data
  • As in it knows which Cassandra node the data is on
  • Depends on the RDD being partition in a similar way to the Cassandra table
  • spark.locality.wait property defines how long to wait for the preferred partition/node
  • data is paged out according to the spark.cassandra.input.page.row.size property

Cassandra Stream Word Count

First we need the appropriate imports and create the context
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.streaming._
import com.datastax.spark.connector._

val conf = new SparkConf()
   .setAppName("Twitter HashTag Counter")
   .setMaster("local[2]")
   //We need to specify the Cassandra host
   .set("spark.cassandra.connection.host", 
           "127.0.0.1")

val ssc = new StreamingContext(conf, Seconds(1))
  • Note the cassandra configuration

Cassandra Stream Word Count (cont...)

Next we parse the and filter the tweets
//Receive all tweets unfiltered
val stream: ReceiverInputDStream[Status] = 
   TwitterUtils.createStream(ssc, None)

//Split tweets into words
val words = stream.flatMap(tweet => 
   tweet.getText.toLowerCase.split(" "))

//Pull out the hashtags and map them to a count
val hashtags = words.filter(word => 
   word.startsWith("#"))
  • Not much new here

Cassandra Stream Word Count (cont...)

We can now count the words and write the data to Cassandra
//We need to define a case class
case class HashTagCount(
   hashtag: String, count: Long)

hashtags
   //This is exactly the same as the map -> reduce
   .countByValue()
   //map to the case class
   .map(rdd => HashTagCount(rdd._1, rdd._2))
   //save the stream to Cassandra
   .saveToCassandra("spark", "wordcount")
  • We're saving the case class to cassandra, incrementing the counter
  • Note that the data is not ordered here

Cassandra Stream Word Count output

That's all folks...

Architectural challenges of IoT and Big Data Plus a dose of Spark for good measure :) Created by Mark Harrison / @markglh