Distributed Computing made easier
Alexandre Beaulne
- easy vs easier
- ambitious
- moving target
- define success of talk
Agenda
- context
- theory
- libraries
- demo
Context
explosion in amount of data produced
“Every two days now we create as much information as we did from the dawn of civilization up until
2003”
-Eric Schmidt, Google's chairman, Techonomy conf, August 2010
Context
Moore's law in jeopardy
Gordon Moore in 1965 predicted that the number of transistors per square inch
would double every two years
Two approaches:
- scaling vertically
- scaling horizontally
“Scaling up is trying to make a machine feel like 100,
scaling out is trying to make 100 machines feel like 1”
MapReduce: Simplified Data Processing on Large Clusters
Jeffrey Dean and Sanjay Ghemawat, 2008
2006-2011: Apache Hadoop makes the mapreduce model available to the masses
Digression
Latency Comparison Numbers
ns
us
ms
L1 cache reference
0.5
Branch mispredict
5
L2 cache reference
7
Mutex lock/unlock
25
Main memory reference
100
Compress 1K bytes with Zippy
3,000
3
Send 1K bytes over 1 Gbps network
10,000
10
Read 4K randomly from SSD*
150,000
150
Read 1 MB sequentially from memory
250,000
250
Round trip within same datacenter
500,000
500
Read 1 MB sequentially from SSD*
1,000,000
1,000
1
Disk seek
10,000,000
10,000
10
Read 1 MB sequentially from disk
20,000,000
20,000
20
Send packet CA->Netherlands->CA
150,000,000
150,000
150
Src: Software engineering advice from building large-scale distributed center, Jeffrey Dean
Digression
If L1 access is a second, then:
L1 cache reference
0:00:01
Branch mispredict
0:00:10
L2 cache reference
0:00:14
Mutex lock/unlock
0:00:50
Main memory reference
0:03:20
Compress 1K bytes with Zippy
1:40:00
Send 1K bytes over 1 Gbps network
5:33:20
Read 4K randomly from SSD
3 days, 11:20:00
Read 1 MB sequentially from memory
5 days, 18:53:20
Round trip within same datacenter
11 days, 13:46:40
Read 1 MB sequentially from SSD
23 days, 3:33:20
Disk seek
231 days, 11:33:20
Read 1 MB sequentially from disk
462 days, 23:06:40
Send packet CA->Netherlands->CA
3472 days, 5:20:00
Src: Software engineering advice from building large-scale distributed center, Jeffrey Dean
2009-2014: Apache Spark released by Amplab at UC Berkeley
Objectives:
- Avoid disk reads whenever possible
- Support flexible workflows
- Offers simple interfaces for a wide range of users
- Unless instructed to, will only spill to disk if run out of memory
- Instead of map-reduce-map-reduce-map-reduce, have
- map-filter-map-map-join-reduce-map-reduce
- Written in Scala but multiple interfaces
- Scala - Java - Python - R - SQL - REPL
RDD
-
Resilient Distributed Dataset
- RDDs are logical collections of data partitioned across machines
- RDDs are the essence of Spark
- RDDs are the interface to your data
- RDDs are immutable
- RDDs are lazily evaluated
Input/Output
- Txt, CSV, JSON
- HDFS
- JDBC, HBase, Cassandra
- many more!
RDD
- Two types of operations on RDDs:
Transformations
Actions
- map()
- filter()
- flatMap()
- sample()
- distinct()
- sortByKey()
- union()
- intersection()
- substract()
- join()
- cartesian()
- collect()
- count()
- countByValue()
- take()
- reduce()
- saveAsTextFile()
Quick Example
$ head nasa.log
in24.inetnebr.com - - [01/Aug/2015:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839
uplherc.upl.com - - [01/Aug/2015:00:00:07 -0400] "GET / HTTP/1.0" 304 0
uplherc.upl.com - - [01/Aug/2015:00:00:08 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 304 0
uplherc.upl.com - - [01/Aug/2015:00:00:08 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 304 0
uplherc.upl.com - - [01/Aug/2015:00:00:08 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 304 0
ix-esc-ca2-07.ix.netcom.com - - [01/Aug/2015:00:00:09 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1713
uplherc.upl.com - - [01/Aug/2015:00:00:10 -0400] "GET /images/WORLD-logosmall.gif HTTP/1.0" 304 0
slppp6.intermind.net - - [01/Aug/2015:00:00:10 -0400] "GET /history/skylab/skylab.html HTTP/1.0" 200 1687
piweba4y.prodigy.com - - [01/Aug/2015:00:00:10 -0400] "GET /images/launchmedium.gif HTTP/1.0" 200 11853
slppp6.intermind.net - - [01/Aug/2015:00:00:11 -0400] "GET /history/skylab/skylab-small.gif HTTP/1.0" 200 9202
Quick Example
$ pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
>>>
using pyspark here, but everything I do would be possible in Scala/R
Quick Example
>>> log_rdd = sc.textFile('nasa.log')
>>> tokens_rdd = log_rdd.map(lambda line: line.split(' '))
>>> hours_rdd = tokens_rdd.map(lambda tokens: datetime.strptime(tokens[3], '[%d/%b/%Y:%H:%M:%S').hour)
>>> hours_rdd.countByValue()
{0: 47862, 1: 38531, 2: 32508, 3: 29995, 4: 26756, 5: 27587, 6: 31287, 7: 47386, 8: 65443, 9: 78695, 10: 88309, 11: 95344, 12: 105143, 13: 104536, 14: 101394, 15: 109465, 16: 99527, 17: 80834, 18: 66809, 19: 59315, 20: 59944, 21: 57985, 22: 60673, 23: 54570}
Persistence
- By default, complete RDD lineage are recomputed when an action is called on them
- persist() action comes in handy when we know work will be done repeatedly on an RDD
- RDD can be persisted either in-memory or on disk
- cache() is an alias for persisting in-memory
Quick Example
>>> log_rdd = sc.textFile('nasa.log')
>>> tokens_rdd = log_rdd.map(lambda line: line.split(' '))
>>> hours_rdd = tokens_rdd.map(lambda tokens: datetime.strptime(tokens[3], '[%d/%b/%Y:%H:%M:%S').hour)
>>> hours_rdd.countByValue()
{0: 47862, 1: 38531, 2: 32508, 3: 29995, 4: 26756, 5: 27587, 6: 31287, 7: 47386, 8: 65443, 9: 78695, 10: 88309, 11: 95344, 12: 105143, 13: 104536, 14: 101394, 15: 109465, 16: 99527, 17: 80834, 18: 66809, 19: 59315, 20: 59944, 21: 57985, 22: 60673, 23: 54570}
>>> hours_rdd.distinct().collect()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23]
Quick Example
>>> log_rdd = sc.textFile('nasa.log')
>>> tokens_rdd = log_rdd.map(lambda line: line.split(' '))
>>> hours_rdd = tokens_rdd.map(lambda tokens: datetime.strptime(tokens[3], '[%d/%b/%Y:%H:%M:%S').hour)
>>> hours_rdd.cache()
>>> hours_rdd.countByValue()
{0: 47862, 1: 38531, 2: 32508, 3: 29995, 4: 26756, 5: 27587, 6: 31287, 7: 47386, 8: 65443, 9: 78695, 10: 88309, 11: 95344, 12: 105143, 13: 104536, 14: 101394, 15: 109465, 16: 99527, 17: 80834, 18: 66809, 19: 59315, 20: 59944, 21: 57985, 22: 60673, 23: 54570}
>>> hours_rdd.distinct().collect()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23]
Spark SQL and Dataframes
- Introduced in February 2015
- Distributed equivalent of R and Pandas' dataframes*
- Offer a more powerful and expressive interface
people.json
{“name”:”Zhang”, “age”: 23, “address”:{“city”:”Columbus”,”state”:”Ohio”}}
{“name”:”Michael”, “age”: 34, “address”:{“city”:null, “state”:”California”}}
Dataframe API:
sqlContext = SQLContext(sc)
people_df = sqlContext.jsonFile("people.json")
print(people_df.select('age').avg().collect())
SQL API:
sqlContext = SQLContext(sc)
people_df = sqlContext.jsonFile("people.json")
people_df.registerTempTable("people")
upper_first_letter = lambda string: string[0].upper() + string[1:]
sqlContext.registerFunction('upper_first_letter', upper_first_letter)
nameAndAddress = sqlContext.sql("""
SELECT upper_first_letter(name), address.city, address.state FROM people
""")
for name, address, state in nameAndAddress.collect():
print('{} - {} - {}'.format(name, address, state))
MLlib
Built-in library for Machine Learning. Supports:
- logistic regression and linear support vector machine (SVM)
- classification and regression tree
- random forest and gradient-boosted trees
- clustering via k-means, bisecting k-means, Gaussian mixtures (GMM)
- singular value decomposition (SVD) and QR decomposition
- principal component analysis (PCA)
- summary statistics and hypothesis testing
- feature transformations
- model evaluation and hyper-parameter tuning
- and many more
Spark Streaming
- Process large streams of data in near real-time
- Chop up the live stream into batches of X seconds (as low as 0.5 s)
- Treat each micro-batch as an RDD and process them using RDD operations
HKID check digit
A182361(5)
CA182361(1)
HKID check digit algorithm
- if HKID is eigth characters long: checksum=7∑i=0(9−i)×CHAR2INT[HKID[i]]
- if HKID is seven character long: checksum=9×36+6∑i=0(8−i)×CHAR2INT[HKID[i]]
- check digit is given by: checkdigit=INT2CHAR[11−checksum % 11]
Where
CHAR2INT
INT2CHAR
Char
Int
'0'
0
'1'
1
'2'
2
...
...
'9'
9
'A'
10
'B'
11
...
...
'Z'
35
Int
Char
1
'1'
2
'2'
3
'3'
4
'4'
5
'5'
6
'6'
7
'7'
8
'8'
9
'9'
10
'A'
11
'0'
in Python
from random import randint, random, choice
LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
CHARS = '0123456789' + LETTERS
CHAR2INT = {k:v for k, v in zip(CHARS, range(36))}
INT2CHAR = {k:v for k, v in zip(range(1, 12), '123456789A0')}
def checkdigit(hkid):
checksum = CHAR2INT[hkid[0]] * 9 if len(hkid) == 8 else 324
checksum += sum([(8 - i) * CHAR2INT[x] for i, x in enumerate(hkid[-7:])])
return INT2CHAR[11 - checksum % 11]
def gen_hkid():
head = choice(LETTERS) if random() < 0.5 else ''
neck = choice(LETTERS)
tail = randint(0, 999999)
id = '{}{}{:06d}'.format(head, neck, tail)
checksum = checkdigit(id)
return '{}({})'.format(id, checksum)
in plain Python
if __name__ == '__main__':
indices = xrange(int(sys.argv[1]))
hkids = (gen_hkid() for i in indices)
for hkid in hkids:
print(hkid)
with Spark RDD
if __name__ == '__main__':
sc = SparkContext(master='spark://ec2-54-229-175-30.eu-west-1.compute.amazonaws.com:7077', appName='snk')
indices_rdd = sc.range(0, int(sys.argv[1]), 20)
hkids_rdd = indices_rdd.map(lambda _index: gen_hkid())
for hkid in hkids_rdd.collect():
print(hkid)
Sample size
Plain Python
Spark RDD*
7 264 100
~120 s
~120 s
1 375 000 000
~6 h
~ 18 mins
* Spark cluster on AWS EC2 with 10 M1-large instances, 2 cores per instances
Deploying
- Databricks' Spark-as-a-Service
- Spark Amazon AWS EC2 deploy scripts
- Spark local-cluster deploy scripts
- Spark Flintrock scripts
The biggest Spark clusters in production are in China
- Baidu with > 1000 nodes
- Tencent (Wechat/Weixin) with 8000 nodes
- Alibaba was an early adopter at v0.6
Source: Reynold Xin, Cofounder Databricks, SparkConf Singapore 2015
Distributed Computing made easier
Alexandre Beaulne
easy vs easier
ambitious
moving target
define success of talk