Ville Seppänen @Vilsepi | Jari Voutilainen @Zharktas | @GoforeOy
All presentation material is available at https://github.com/gofore/aws-emr
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input my/Input/Directories \ -output my/Output/Directory \ -mapper myMapperProgram.py \ -reducer myReducerProgram.py
cat input_data.txt | mapper.py | reducer.py > output_data.txt
The endlessly fascinating example of counting words in Hadoop
#!/usr/bin/python import sys import re pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*") for line in sys.stdin: for word in pattern.findall(line): print "LongValueSum:" + word.lower() + "\t" + "1"
LongValueSum:i 1 LongValueSum:count 1 LongValueSum:words 1 LongValueSum:with 1 LongValueSum:hadoop 1
<link> <linkno>310102</linkno> <startsite>1108</startsite> <endsite>1107</endsite> <name language="en">Hallila -> Kaukajärvi</name> <name language="fi">Hallila -> Kaukajärvi</name> <name language="sv">Hallila -> Kaukajärvi</name> <distance> <value>3875.000</value> <unit>m</unit> </distance> </link>
Static link information (271kb xml)
642 one-way links, 243 sites
<ivjtdata duration="60" periodstart="2014-06-24T02:55:00Z"> <recognitions> <link id="110302" data_source="1"> <recognition offset_seconds="8" travel_time="152"></recognition> <recognition offset_seconds="36" travel_time="155"></recognition> </link> <link id="410102" data_source="1"> <recognition offset_seconds="6" travel_time="126"></recognition> <recognition offset_seconds="45" travel_time="152"></recognition> </link> <link id="810502" data_source="1"> <recognition offset_seconds="25" travel_time="66"></recognition> <recognition offset_seconds="34" travel_time="79"></recognition> <recognition offset_seconds="35" travel_time="67"></recognition> <recognition offset_seconds="53" travel_time="58"></recognition> </link> </recognitions> </ivjtdata>
Each file contains finished passthroughs for each road segment during one minute.
{ "sites": [ { "id": "1205", "name": "Viinikka", "lat": 61.488282, "lon": 23.779057, "rno": "3495", "tro": "3495/1-2930" } ], "links": [ { "id": "99001041", "name": "Hallila -> Viinikka", "dist": 5003.0, "site_start": "1108", "site_end": "1205" }] }Static link information (120kb json)
{ "date": "2014-06-01T02:52:00Z", "recognitions": [ { "id": "4510201", "tt": 117, "cars": 4, "itts": [ 100, 139, 121, 110 ] } ] }
#!/usr/bin/env python import boto.emr from boto.emr.instance_group import InstanceGroup # Requires that AWS API credentials have been exported as env variables connection = boto.emr.connect_to_region('eu-west-1')
instance_groups = [] instance_groups.append(InstanceGroup( role="MASTER", name="Main node", type="m1.medium", num_instances=1, market="ON_DEMAND")) instance_groups.append(InstanceGroup( role="CORE", name="Worker nodes", type="m1.medium", num_instances=3, market="ON_DEMAND")) instance_groups.append(InstanceGroup( role="TASK", name="Optional spot-price nodes", type="m1.medium", num_instances=20, market="SPOT", bidprice=0.012))
cluster_id = connection.run_jobflow( "Our awesome cluster", instance_groups=instance_groups, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, enable_debugging=True, log_uri="s3://our-s3-bucket/logs/", ami_version="3.3.1", bootstrap_actions=[], ec2_keyname="name-of-our-ssh-key", visible_to_all_users=True, job_flow_role="EMR_EC2_DefaultRole", service_role="EMR_DefaultRole")
steps = [] steps.append(boto.emr.step.StreamingStep( "Our awesome streaming app", input="s3://our-s3-bucket/our-input-data", output="s3://our-s3-bucket/our-output-path/", mapper="our-mapper.py", reducer="aggregate", cache_files=[ "s3://our-s3-bucket/programs/our-mapper.py#our-mapper.py", "s3://our-s3-bucket/data/our-dictionary.json#our-dictionary.json",) ], action_on_failure='CANCEL_AND_WAIT', jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar')) connection.add_jobflow_steps(cluster_id, steps)
#!/usr/bin/env python import boto.emr from boto.emr.instance_group import InstanceGroup connection = boto.emr.connect_to_region('eu-west-1') cluster_id = connection.run_jobflow(**cluster_parameters) connection.add_jobflow_steps(cluster_id, **steps_parameters)
# Create new cluster aws-tools/run-jobs.py create-cluster "Car speed counting cluster" Starting cluster j-F0K0A4Q9F5O0 Car speed counting cluster
# Add job step to the cluster aws-tools/run-jobs.py run-step j-F0K0A4Q9F5O0 05-car-speed-for-time-of-day_map.py digitraffic/munged/links-by-month/2014 Step will output data to s3://hadoop-seminar-emr/digitraffic/outputs/ 2015-02-18_11-08-24_05-car-speed-for-time-of-day_map.py/
# Download and concatenate output aws s3 cp s3://hadoop-seminar-emr/digitraffic/outputs/2015-02-18_11-08-24_05-car-speed-for-time-of-day_map.py/ /tmp/emr --recursive --profile hadoop-seminar-emr cat /tmp/emr/part-* > /tmp/emr/output
# Analyze results result-analysis/05_speed_during_day/05-car-speed-for-time-of-day_output.py /tmp/emr/output example-data/locationdata.json