On Github RBeaudoin / cassandra-day-presentation
Presented by Rich Beaudoin / @RichGBeaudoinOctober 14th, 2014
Approaches like MapReduce read from, and store to HDFS
...so each cycle of processing incurs latency from HDFS readsAny robust, distributed data processing framework needs fault tolerance
But existing solutions allow for "fine-grained" (cell level) updates, which can complicate the handling of faults where data needs to be rebuilt/recalculatedSolution 1: store intermediate results in memory
Solution 2: introduce a new expressive data abstraction
*See the RDD white paper for more details
"transformation" creates another RDD, is evaluated lazily
"action" returns a value, evaluated immediately
It turns out that coarse-grained operations cover many existing parrallel computing cases
Consequently, the RDD abstraction can implement existing systems like MapReduce, Pregel, Dryad, etc.
Spark can be run with Apache Mesos, HADOOP Yarn, or it's own standalone cluster manager
If we can turn Cassandra data into RDDs, and RDDs into Cassandra data, then the data can start flowing between the two systems and give us some insight into our data.
The Spark Cassandra Connector allows us to perform the transformation from Cassadra table to RDD and then back again!
import org.apache.spark._ import com.datastax.spark.connector._ val rdd = sc.cassandraTable("music", "albums_by_artist")
Run these commands spark-shell, requires specifying the spark-connector jar on the commandline
val count = rdd.map(x => (x.get[String]("label"),1)).reduceByKey(_ + _)
count.saveToCassandra("music", "label_count",SomeColumns("label", "count"))
import org.apache.spark.sql.cassandra.CassandraSQLContext val cc = new CassandraSQLContext(sc) val rdd = cc.sql("SELECT * from music.label_count")
import sqlContext.createSchemaRDD import org.apache.spark.sql._ case class LabelCount(label: String, count: Int) case class AlbumArtist(artist: String, album: String, label: String, year: Int) case class AlbumArtistCount(artist: String, album: String, label: String, year: Int, count: Int) val albumArtists = sc.cassandraTable[AlbumArtist]("music","albums_by_artists").cache val labelCounts = sc.cassandraTable[LabelCount]("music", "label_count").cache val albumsByLabelId = albumArtists.keyBy(x => x.label) val countsByLabelId = labelCounts.keyBy(x => x.label) val joinedAlbums = albumsByLabelId.join(countsByLabelId).cache val albumArtistCountObjects = joinedAlbums.map(x => (new AlbumArtistCount(x._2._1.artist, x._2._1.album, x._2._1.label, x._2._1.year, x._2._2.count)))
References