Tuples – All – The Way



Tuples – All – The Way

0 0


scalding-talk

An introduction to scala, hadoop, cascading, and scalding.

On Github giokincade / scalding-talk

Tuples

All

The Way

Down

Hi

  • My name is Gio and I work at Etsy.
  • You may have heard about us on Yahoo Finance or something.
  • Marketplace for unique creative goods.
  • 1.4 Million active sellers
  • 20 Million active buyers
  • 2 Billion in Sales last year.
  • We get some traffic. We've built a fair number of products.
  • something smart

About this Talk

Scala

Hadoop

Cascading

Scalding

Scala

Implicit Conversions

scala> "blah and blah"
res4: java.lang.String = blah and blah

scala> "blah and blah".urlencode()
error: value urlencode is not a member of java.lang.String
class WrappedString(val x:String) {
  def urlencode() = URLEncoder.encode(x)
}
implicit def stringToWrappedString(x:String): WrappedString =
  new WrappedString(x)
scala> "blah and blah".urlencode()
res2: java.lang.String = blah+and+blah!

scala> stringToWrappedString(“blah and blah”).urlencode()
res2: java.lang.String = blah+and+blah!

Implicit Parameters

def emailSignature(name:String)(implicit organization:String) = 
  "Sincerely Yours,\n" + name + "\n" + organization
scala> implicit val organization = "Etsy, Inc."
organization: java.lang.String = Etsy, Inc.
scala> emailSignature("Giovanni Fernandez-Kincade")
res3: java.lang.String =
Sincerely Yours,
Giovanni Fernandez-Kincade
Etsy, Inc.

Word Count

First Character Count

val words = List("one", "two", "three")

words
  .map {
    word =>
      Map(word.substring(0,1) -> 1)
  }
  .reduce { 
    (a, b) =>
      a ++ b.map { 
        mapTuple => 
          val (key, count) = mapTuple
          (key, a.getOrElse(key, 0) + count)
      }
  }
val words = List("one", "two", "three")

words
  .map {
    word =>
      Map(word.substring(0,1) -> 1)
  }
List(Map(o -> 1), Map(t -> 1), Map(t -> 1))
List(Map(o -> 1), Map(t -> 1), Map(t -> 1))
.reduce { 
  (a, b) =>
    a ++ b.map { 
      mapTuple => 
        val (key, count) = mapTuple
        (key, a.getOrElse(key, 0) + count)
    }
}
Map(o -> 1, t -> 2)

Hadoop

~> echo "one\ntwo\nthree" > ~/foo.txt
~> hadoop fs -put ~/foo.txt

Sort Of

void map(K1 key, V1 value, OutputCollector<K2,V2> output)

void reduce(K2 key, Iterator<V2> values, 
              OutputCollector<K3,V3> output)

Cascading

cascading.tuple.Tuple
val tuple = new Tuple(
    1.asInstanceOf[Object], 
    "gigi".asInstanceOf[Object], 
    "miami".asInstanceOf[Object]
)
val fields = new Fields(
  "user_id", 
  "user_name", 
  "location"
)
val one = new TupleEntry(
  fields,
  tuple
)
> one.get("user_id")
res4: java.lang.Comparable[_] = 1

> one.get("user_name")
res5: java.lang.Comparable[_] = gigi

First Character Count 2.0

def cascadingTuple[T](fieldName:String, entry:T): TupleEntry = 
  new TupleEntry(
    new Fields(fieldName),
    new Tuple(entry.asInstanceOf[Object])
  )
val words = List(
  cascadingTuple("word", "one"),
  cascadingTuple("word", "two"),
  cascadingTuple("word", "three")
)
words
  .map {
    tuple => 
      cascadingTuple(
        "map",
        Map(tuple.getString("word").substring(0,1) -> 1)
      )
  }
.reduce {
  (tupleA, tupleB) =>
    val (a,b) = (
      tupleA.getObject("map").asInstanceOf[Map[String,Int]],
      tupleB.getObject("map").asInstanceOf[Map[String,Int]],
    )

    val result = a ++ b.map { 
      mapTuple =>
        val (key, count) = mapTuple
        (key, a.getOrElse(key, 0) + count)
    }

    cascadingTuple("map", result)
}
> run
fields: ['map'] tuple: ['Map(o -> 1, t -> 2)']

First Character Count 1.2

List("one", "two", "three")
  .groupBy(_.substring(0,1))
res1: Map(o -> List(one), t -> List(two, three))
List("one", "two", "three")
  .groupBy(_.substring(0,1))
  .map {
    (tuple) =>
      val (key, value) = tuple 
      (key, value.size)
  }
res1: Map(o -> 1, t -> 2)

Scalding

import com.twitter.scalding._

class ExampleJob(args: Args) 
  extends Job(args) {

  TextLine("data/words.txt")
    .map('line -> 'first_character) {
      (line:String) =>
        line.substring(0,1)
    }
    .groupBy('first_character) {
      (g: GroupBuilder) =>
        g.size('size)
    }
    .write(Tsv("data/output/characters.tsv"))
}
class ExampleJob(args: Args) 
  extends Job(args) {
TextLine("data/words.txt")
fields: ['line'] tuple: ['one']
fields: ['line'] tuple: ['two']
fields: ['line'] tuple: ['three']
tuple.getString("line")
.map('line -> 'first_character) {
  (line:String) =>
    line.substring(0,1)
}
tuple.setString("first_character”, line.substring(0,1))
fields: ['line', 'first_character'] tuple: ['one', 'o']
fields: ['line', 'first_character'] tuple: ['two', 't']
fields: ['line', 'first_character'] tuple: ['three', 't']
tuple.getObject("first_character")
.groupBy('first_character) {
  (g: GroupBuilder) =>
    g.size('size)
}
tuple.setInt("size", ...)
fields: ['first_character', 'size'] tuple: ['o', 1]
fields: ['first_character', 'size'] tuple: ['t', 2]
.groupBy('first_character) {
  (g: GroupBuilder) =>
    g.size('size)
}
fields: ['first_character', ‘size’] tuple: ['o', ‘1’]
fields: ['first_character', ‘size’] tuple: ['t', ‘2’]
.write(Tsv("data/output/characters.tsv"))
o	1
t	2

Behind the Scenes

TextLine("data/words.txt")
    .map('line -> 'first_character) {

Huh?

class Job(val args: Args) 
  extends FieldConversions 
  with java.io.Serializable {

  implicit def pipeToRichPipe(pipe: Pipe): RichPipe = 
    new RichPipe(pipe)

  implicit def sourceToRichPipe(src: Source): RichPipe =
    new RichPipe(src.read)
def map[A, T](fs: (Fields, Fields))
             (fn: A => T)
             (implicit conv: TupleConverter[A], 
                setter: TupleSetter[T]): Pipe
.map('line -> 'first_character) {
def map[A, T](fs: (Fields, Fields))
implicit def symbolToFields(x: Symbol) = {
    if (x == '*) {
      Fields.ALL
    } else {
      new Fields(x.name)
    }
  }
.map('line -> 'first_character) {
  (line:String) =>
    line.substring(0,1)
}
def map[A, T](fs: (Fields, Fields))
             (fn: A => T)
implicit def tuple1Converter[A](implicit gA: TupleGetter[A]): TupleConverter[Tuple1[A]]
implicit object StringGetter extends TupleGetter[String] {
  override def get(tup: CTuple, i: Int) = tup.getString(i)
}
.map('line -> 'first_character) {
  (line:String) =>
    line.substring(0,1)
}
def map[A, T](fs: (Fields, Fields))
             (fn: A => T)
             (implicit conv: TupleConverter[A], 
                setter: TupleSetter[T]): Pipe
implicit def singleSetter[A]: TupleSetter[A] = new TupleSetter[A] {
  override def apply(arg: A) = {
    val tup = CTuple.size(1)
    tup.set(0, arg)
    tup
  }

What Happens When You Run it?

> runMain com.twitter.scalding.Tool
    com.giokincade.scalding.ExampleJob --local
hadoop jar foo.jar com.giokincade.scalding.ExampleJob --hdfs

Compile Time

Composition Time

Job Run Time

Typed Pipe

TypedPipe.from(
    TextLine("data/words.txt")
  )
  .map(line => line.substring(0,1)) 
  .groupBy(character => character)
  .size
  .write(
    TypedTsv[(String, Long)]("data/output/typed-characters.tsv")
  )

Resources

  • github.com/giokincade/scalding-talk-examples
  • github.com/twitter/scalding

Me

  • gio@etsy.com
  • @giokincade

Fin

Tuples All The Way Down