What is spark streaming
- An extension of the core Spark API
- Enables scalable, high-throughput, fault-tolerant stream processing of live data streams
- Ingest data from many sources like Kafka, Twitter, or TCP sockets
- High-level functions like
map,
reduce,
window
How spark streaming works
- Use DStream to represent a continuous stream of data
- A DStream is a sequence of RDDs
Spark streaming workflow
Everything starts with a StreamingContext
Add an input DStream and receiver
Perform transformations on DStreams
Output the results
StreamingContext
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, batch_interval)
- When running on local machine, set
master to
"local[*]"
- Set an appropriate batch interval for your application
Input DStreams
- Basic Sources:
- Socket Streams:
ssc.socketTextStream(hostname, port)
- File Streams:
ssc.textFileStream(dataDirectory)
- Queue of RDDs
- Advanced Sources:
- Kafka
- Flume
- Kinesis
- Twitter
- and more...
Transformations on DStreams
- transform
- join
- updateStateByKey
- window operations
- and more...
UpdateStateByKey operation
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
runningCounts = pairs.updateStateByKey(updateFunction)
Window operation
-
Needs two parameters:
-
window length: The duration of the window
-
sliding interval: The interval at which the window
operation is performed
Window operation
-
Functions:
- window
- countByWindow
- reducedByWindow
- reducedByKeyAndWindow
- countByValueAndWindow
Output operations on DStreams
pprint: print the first ten
elements of every batch of data in a DStream
foreachRDD: apply arbitrary
function to each RDD and execute in driver process
A simple network wordcount example
from pyspark import SparkContext, StorageLevel
from pyspark.streaming import StreamingContext
sc = SparkContext("local[*]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999, storageLevel=StorageLevel.MEMORY_AND_DISK)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
Deploy your application
Type
git clone https://github.com/SakuraSakura/spark-project.git
to clone the base code of examples
Type
nc -lk 9999
to start a simple socket server
Type
bin/spark-submit wordcount.py
to start your application