aws-emr



aws-emr

2 1


aws-emr

A demonstration of Amazon Elastic MapReduce

On Github gofore / aws-emr

Amazon Elastic MapReduce

Ville Seppänen @Vilsepi | Jari Voutilainen @Zharktas | @GoforeOy

Agenda

Introduction to Hadoop Streaming and Elastic MapReduce Simple EMR web interface demo Introduction to our dataset Using EMR from command line with boto

All presentation material is available at https://github.com/gofore/aws-emr

Hadoop Streaming

  • Utility that allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
    -input   my/Input/Directories \
    -output  my/Output/Directory  \
    -mapper  myMapperProgram.py   \
    -reducer myReducerProgram.py
cat input_data.txt | mapper.py | reducer.py > output_data.txt

Elastic MapReduce (EMR)

Amazon Elastic MapReduce

  • Hadoop-based MapReduce cluster as a service
  • Can run either Amazon-optimized Hadoop or MapR
  • Managed from a web UI or through API

Hadoop streaming in EMR

Quick EMR demo

The endlessly fascinating example of counting words in Hadoop

Cluster creation steps

  • Cluster: name, logging
  • Tags: keywords for the cluster
  • Software: Hadoop distribution and version, pre-installed applications (Hive, Pig,...)
  • File System: encryption, consistency
  • Hardware: number and type of instances
  • Security and Access: ssh keys, node access roles
  • Bootstrap Actions: scripts to customize the cluster
  • Steps: a queue of mapreduce jobs for the cluster

WordSplitter.py (mapper)

#!/usr/bin/python
import sys
import re

pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
for line in sys.stdin:
    for word in pattern.findall(line):
        print "LongValueSum:" + word.lower() + "\t" + "1"
LongValueSum:i         1
LongValueSum:count     1
LongValueSum:words     1
LongValueSum:with      1
LongValueSum:hadoop    1

Filesystems

  • EMRFS is an implementation of HDFS, with reading and writing of files directly to S3.
  • HDFS should be used to cache results of intermediate steps.
  • S3 is block-based just like HDFS. S3n is file based, which can be accessed with other tools, but filesize is limited to 5GB
  • S3 is not a file system, it is a RESTish object storage.
  • S3 has eventual consistency: files written to S3 might not be immediately available for reading.
  • EMR FS can be configured to encrypt files in S3 and monitor consistancy of files, which can detect event that try to use inconsistant files.

http://wiki.apache.org/hadoop/AmazonS3

Our dataset

  • Digitraffic is a service offering real time information and data about the traffic, weather and condition information on the Finnish main roads.
  • The service is provided by the Finnish Transport Agency (Liikennevirasto), and produced by Gofore and Infotripla.

Traffic fluency

Travel time link network

<link>
  <linkno>310102</linkno>
  <startsite>1108</startsite>
  <endsite>1107</endsite>
  <name language="en">Hallila -> Kaukajärvi</name>
  <name language="fi">Hallila -> Kaukajärvi</name>
  <name language="sv">Hallila -> Kaukajärvi</name>
  <distance>
    <value>3875.000</value>
    <unit>m</unit>
  </distance>
</link>

Static link information (271kb xml)

642 one-way links, 243 sites

<ivjtdata duration="60" periodstart="2014-06-24T02:55:00Z">
  <recognitions>
    <link id="110302" data_source="1">
      <recognition offset_seconds="8"  travel_time="152"></recognition>
      <recognition offset_seconds="36" travel_time="155"></recognition>
    </link>
    <link id="410102" data_source="1">
      <recognition offset_seconds="6"  travel_time="126"></recognition>
      <recognition offset_seconds="45" travel_time="152"></recognition>
    </link>
    <link id="810502" data_source="1">
      <recognition offset_seconds="25" travel_time="66"></recognition>
      <recognition offset_seconds="34" travel_time="79"></recognition>
      <recognition offset_seconds="35" travel_time="67"></recognition>
      <recognition offset_seconds="53" travel_time="58"></recognition>
    </link>
  </recognitions>
</ivjtdata>

Each file contains finished passthroughs for each road segment during one minute.

Some numbers

  • 6.5 years worth of data from January 2008 to June 2014
  • 3.9 million XML files (525600 files per year)
  • 6.3 GB of compressed archives (with 7.5GB of additional median data as CSV)
  • 42 GB of data as XML (and 13 GB as CSV)

Potential research questions

Do people drive faster during the night? Does winter time have less recognitions (either due to less cars or snowy plates)? How well number of recognitions correlate with speed (rush hour probably slows travel, but are speeds higher during days with less traffic)? Is it possible to identify speed limits from the travel times? How much dispersion in speeds? When do speed limits change (winter and summer limits)?

Munging

The small files problem

  • Unpacked the tar.gz archives and uploaded the XML files as such to S3 (using AWS CLI tools).
  • Turns out (4 million 11kB) small files with Hadoop is not fun. Hadoop does not handle well with files significantly smaller than the HDFS block size (default 64MB) [1] [2] [3]
  • And well, XML is not fun either, so...

JSONify all the things!

  • Wrote scripts to parse, munge and upload data
  • Concatenated data into bigger files, calculated some extra data, and converted it into JSON. Size reduced to 60% of the original XML.
  • First munged 1-day files (10-20MB each) and later 1-month files (180-540MB each)
  • Munging XML worth of 6.5 years takes 8.5 hours on a single t2.medium instance

{
  "sites": [
    {
     "id": "1205",
     "name": "Viinikka",
     "lat": 61.488282,
     "lon": 23.779057,
     "rno": "3495",
     "tro": "3495/1-2930"
    }
  ],
  "links": [
    {
      "id": "99001041",
      "name": "Hallila -> Viinikka",
      "dist": 5003.0,
      "site_start": "1108",
      "site_end": "1205"
    }]
}
Static link information (120kb json)

{
  "date": "2014-06-01T02:52:00Z",
  "recognitions": [
    {
      "id": "4510201",
      "tt": 117,
      "cars": 4,
      "itts": [
        100,
        139,
        121,
        110
      ]
    }
  ]
}

Programming EMR

Alternatives for the web interface

Connect to EMR

#!/usr/bin/env python

import boto.emr
from boto.emr.instance_group import InstanceGroup

# Requires that AWS API credentials have been exported as env variables
connection = boto.emr.connect_to_region('eu-west-1')

Specify EC2 instances

instance_groups = []
instance_groups.append(InstanceGroup(
    role="MASTER", name="Main node",
    type="m1.medium", num_instances=1,
    market="ON_DEMAND"))
instance_groups.append(InstanceGroup(
    role="CORE", name="Worker nodes",
    type="m1.medium", num_instances=3,
    market="ON_DEMAND"))
instance_groups.append(InstanceGroup(
    role="TASK", name="Optional spot-price nodes",
    type="m1.medium", num_instances=20,
    market="SPOT", bidprice=0.012))

Start EMR cluster

cluster_id = connection.run_jobflow(
    "Our awesome cluster",
    instance_groups=instance_groups,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    enable_debugging=True,
    log_uri="s3://our-s3-bucket/logs/",
    ami_version="3.3.1",
    bootstrap_actions=[],
    ec2_keyname="name-of-our-ssh-key",
    visible_to_all_users=True,
    job_flow_role="EMR_EC2_DefaultRole",
    service_role="EMR_DefaultRole")

Add job step to cluster

steps = []
steps.append(boto.emr.step.StreamingStep(
    "Our awesome streaming app",
    input="s3://our-s3-bucket/our-input-data",
    output="s3://our-s3-bucket/our-output-path/",
    mapper="our-mapper.py",
    reducer="aggregate",
    cache_files=[
        "s3://our-s3-bucket/programs/our-mapper.py#our-mapper.py",
        "s3://our-s3-bucket/data/our-dictionary.json#our-dictionary.json",)
        ],
    action_on_failure='CANCEL_AND_WAIT',
    jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'))
connection.add_jobflow_steps(cluster_id, steps)

Recap

#!/usr/bin/env python

import boto.emr
from boto.emr.instance_group import InstanceGroup

connection = boto.emr.connect_to_region('eu-west-1')
cluster_id = connection.run_jobflow(**cluster_parameters)
connection.add_jobflow_steps(cluster_id, **steps_parameters)

Step 1 of 2: Run mapreduce

# Create new cluster
aws-tools/run-jobs.py
  create-cluster
  "Car speed counting cluster"

Starting cluster
  j-F0K0A4Q9F5O0 Car speed counting cluster
# Add job step to the cluster
aws-tools/run-jobs.py
  run-step
  j-F0K0A4Q9F5O0
  05-car-speed-for-time-of-day_map.py
  digitraffic/munged/links-by-month/2014

Step will output data to
  s3://hadoop-seminar-emr/digitraffic/outputs/
  2015-02-18_11-08-24_05-car-speed-for-time-of-day_map.py/

Step 2 of 2: Analyze results

# Download and concatenate output
aws s3 cp 
  s3://hadoop-seminar-emr/digitraffic/outputs/2015-02-18_11-08-24_05-car-speed-for-time-of-day_map.py/
  /tmp/emr 
  --recursive 
  --profile hadoop-seminar-emr

cat /tmp/emr/part-* > /tmp/emr/output
# Analyze results
result-analysis/05_speed_during_day/05-car-speed-for-time-of-day_output.py
  /tmp/emr/output 
  example-data/locationdata.json

Some statistics

  • We experimented with different input files an cluster sizes
  • Execution time was about half hour with small cluster and 30 small 15-20 MB files
  • Same input parsed with simple python script took about 5 minutes
  • Larger cluster and 6 larger 500 MB files took 17 minutes.
  • "Too small problem for EMR/Hadoop"

Summary

Takeaways

  • Make sure your problem is big enough for Hadoop
  • Munging wisely makes streaming programs easier and faster
  • Always use Spot instances with EMR

Further reading