On Github giokincade / scalding-talk
Scala
Hadoop
Cascading
Scalding
scala> "blah and blah" res4: java.lang.String = blah and blah scala> "blah and blah".urlencode() error: value urlencode is not a member of java.lang.String
class WrappedString(val x:String) { def urlencode() = URLEncoder.encode(x) }
implicit def stringToWrappedString(x:String): WrappedString = new WrappedString(x)
scala> "blah and blah".urlencode() res2: java.lang.String = blah+and+blah! scala> stringToWrappedString(“blah and blah”).urlencode() res2: java.lang.String = blah+and+blah!
def emailSignature(name:String)(implicit organization:String) = "Sincerely Yours,\n" + name + "\n" + organization
scala> implicit val organization = "Etsy, Inc." organization: java.lang.String = Etsy, Inc.
scala> emailSignature("Giovanni Fernandez-Kincade") res3: java.lang.String = Sincerely Yours, Giovanni Fernandez-Kincade Etsy, Inc.
val words = List("one", "two", "three") words .map { word => Map(word.substring(0,1) -> 1) } .reduce { (a, b) => a ++ b.map { mapTuple => val (key, count) = mapTuple (key, a.getOrElse(key, 0) + count) } }
val words = List("one", "two", "three") words .map { word => Map(word.substring(0,1) -> 1) }
List(Map(o -> 1), Map(t -> 1), Map(t -> 1))
List(Map(o -> 1), Map(t -> 1), Map(t -> 1))
.reduce { (a, b) => a ++ b.map { mapTuple => val (key, count) = mapTuple (key, a.getOrElse(key, 0) + count) } }
Map(o -> 1, t -> 2)
~> echo "one\ntwo\nthree" > ~/foo.txt ~> hadoop fs -put ~/foo.txt
void map(K1 key, V1 value, OutputCollector<K2,V2> output) void reduce(K2 key, Iterator<V2> values, OutputCollector<K3,V3> output)
cascading.tuple.Tuple
val tuple = new Tuple( 1.asInstanceOf[Object], "gigi".asInstanceOf[Object], "miami".asInstanceOf[Object] )
val fields = new Fields( "user_id", "user_name", "location" )
val one = new TupleEntry( fields, tuple )
> one.get("user_id") res4: java.lang.Comparable[_] = 1 > one.get("user_name") res5: java.lang.Comparable[_] = gigi
def cascadingTuple[T](fieldName:String, entry:T): TupleEntry = new TupleEntry( new Fields(fieldName), new Tuple(entry.asInstanceOf[Object]) )
val words = List( cascadingTuple("word", "one"), cascadingTuple("word", "two"), cascadingTuple("word", "three") )
words .map { tuple => cascadingTuple( "map", Map(tuple.getString("word").substring(0,1) -> 1) ) }
.reduce { (tupleA, tupleB) => val (a,b) = ( tupleA.getObject("map").asInstanceOf[Map[String,Int]], tupleB.getObject("map").asInstanceOf[Map[String,Int]], ) val result = a ++ b.map { mapTuple => val (key, count) = mapTuple (key, a.getOrElse(key, 0) + count) } cascadingTuple("map", result) }
> run fields: ['map'] tuple: ['Map(o -> 1, t -> 2)']
List("one", "two", "three") .groupBy(_.substring(0,1))
res1: Map(o -> List(one), t -> List(two, three))
List("one", "two", "three") .groupBy(_.substring(0,1)) .map { (tuple) => val (key, value) = tuple (key, value.size) }
res1: Map(o -> 1, t -> 2)
import com.twitter.scalding._ class ExampleJob(args: Args) extends Job(args) { TextLine("data/words.txt") .map('line -> 'first_character) { (line:String) => line.substring(0,1) } .groupBy('first_character) { (g: GroupBuilder) => g.size('size) } .write(Tsv("data/output/characters.tsv")) }
class ExampleJob(args: Args) extends Job(args) {
TextLine("data/words.txt")
fields: ['line'] tuple: ['one'] fields: ['line'] tuple: ['two'] fields: ['line'] tuple: ['three']
tuple.getString("line")
.map('line -> 'first_character) { (line:String) => line.substring(0,1) }
tuple.setString("first_character”, line.substring(0,1))
fields: ['line', 'first_character'] tuple: ['one', 'o'] fields: ['line', 'first_character'] tuple: ['two', 't'] fields: ['line', 'first_character'] tuple: ['three', 't']
tuple.getObject("first_character")
.groupBy('first_character) { (g: GroupBuilder) => g.size('size) }
tuple.setInt("size", ...)
fields: ['first_character', 'size'] tuple: ['o', 1] fields: ['first_character', 'size'] tuple: ['t', 2]
.groupBy('first_character) { (g: GroupBuilder) => g.size('size) }
fields: ['first_character', ‘size’] tuple: ['o', ‘1’] fields: ['first_character', ‘size’] tuple: ['t', ‘2’]
.write(Tsv("data/output/characters.tsv"))
o 1 t 2
TextLine("data/words.txt") .map('line -> 'first_character) {
class Job(val args: Args) extends FieldConversions with java.io.Serializable { implicit def pipeToRichPipe(pipe: Pipe): RichPipe = new RichPipe(pipe) implicit def sourceToRichPipe(src: Source): RichPipe = new RichPipe(src.read)
def map[A, T](fs: (Fields, Fields)) (fn: A => T) (implicit conv: TupleConverter[A], setter: TupleSetter[T]): Pipe
.map('line -> 'first_character) {
def map[A, T](fs: (Fields, Fields))
implicit def symbolToFields(x: Symbol) = { if (x == '*) { Fields.ALL } else { new Fields(x.name) } }
.map('line -> 'first_character) { (line:String) => line.substring(0,1) }
def map[A, T](fs: (Fields, Fields)) (fn: A => T)
implicit def tuple1Converter[A](implicit gA: TupleGetter[A]): TupleConverter[Tuple1[A]]
implicit object StringGetter extends TupleGetter[String] { override def get(tup: CTuple, i: Int) = tup.getString(i) }
.map('line -> 'first_character) { (line:String) => line.substring(0,1) }
def map[A, T](fs: (Fields, Fields)) (fn: A => T) (implicit conv: TupleConverter[A], setter: TupleSetter[T]): Pipe
implicit def singleSetter[A]: TupleSetter[A] = new TupleSetter[A] { override def apply(arg: A) = { val tup = CTuple.size(1) tup.set(0, arg) tup }
> runMain com.twitter.scalding.Tool com.giokincade.scalding.ExampleJob --local
hadoop jar foo.jar com.giokincade.scalding.ExampleJob --hdfs
TypedPipe.from( TextLine("data/words.txt") ) .map(line => line.substring(0,1)) .groupBy(character => character) .size .write( TypedTsv[(String, Long)]("data/output/typed-characters.tsv") )