On Github Tarrasch / luigi-budapest-bi-oct-2015
Wednesday, October 14, 2015
Budapest BI Forum 2015
We stream the right music for the right moment.
75M+ active users. 30M+ songs. $3bn paid to rightsholders.
A task orchestrator made in house at Spotify.
Open sourced late 2012.
It is very much like GNU Make.
class MyTask(luigi.Task): some_parameter = luigi.Parameter(default="hello") def complete(self): return True or False def requires(self): return [TaskA(), TaskB(param='yay')] def run(self): print self.some_parameter, 'world'
import luigi class HelloWorldTask(luigi.Task): def run(self): with self.output().open('w') as fd: fd.write('Hello World') def output(self): return luigi.LocalTarget('hello.txt')
$ luigi --module helloworld HelloWorldTask --local-scheduler ... INFO: Scheduled HelloWorldTask() (PENDING) ... INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 HelloWorldTask() ===== Luigi Execution Summary ===== $ cat hello.txt Hello World
$ luigi --module helloworld HelloWorldTask --local-scheduler ... INFO: Scheduled HelloWorldTask() (DONE) ... INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 present dependencies were encountered: - 1 HelloWorldTask() Did not run any tasks ===== Luigi Execution Summary =====
Kind of. Despite that class statements in python are statements. We've made using luigi feel declarative.
This feels like a statement in python, not a declaration
class MyNewClass(object): sys.exit(1)
So how do one achieve this declarative feel?
class MyTask(luigi.Task): def run(self): pass def complete(self): pass def requires(self): pass
$ luigi --module my_tasks MyTask
We need to bring out our Python-fu!
By having a registry!
# from the luigi library source code: @six.add_metaclass(Register) class Task(object): ... class Register(abc.ABCMeta): def __new__(metacls, classname, bases, classdict): """ Is called when a Task class is "declared". """ cls = super(Register, metacls).__new__(...) metacls._reg.append(cls) return cls
In Spotify, we've debated if we want to push Spotify Luigi to become Apache Luigi or not.
I guess that Luigi's community success comes from:
The problem of orchestration is wide
Luigi takes about 10 minutes to just try out.
story time!
When you listen to music on Spotify, we report this to labels and pay them.
When you listen to songs we save EndSong messages.
$ hadoop fs -ls /logs/EndSong/2015-08-27/06/ /cleaned/endsong/2015-08-27/06/part-00000.avro /cleaned/endsong/2015-08-27/06/part-00001.avro /cleaned/endsong/2015-08-27/06/part-00002.avro ...
The files are split into multiple avro files.
What does this do?
$ mkdir my_directcry $ mv my_file.txt my_directory
$ hadoop -jar mapreduce.jar -Poutput=output-luigi-tmp-123456 $ hadoop fs -mv output-luigi-tmp-123456 output
Now what can go wrong? ;)
$ hadoop -jar mapreduce.jar -Poutput=output-luigi-tmp-123456 $ hadoop fs -mkdir output $ hadoop fs -mv output-luigi-tmp-123456/* output
The trick is that hadoop fs -mkdir output will crash if folder exist. That's good, so no other thread can create it. But...
Just a week or so after thanksgiving, a analyst came down to the Data Engineers.
Analyst: Hey, a label reported that they received only 80% of the expected revenue for last Thursday. We think there's a bug in your systems. If labels get pissed, we will all lose our jobs!
Engineer: Well duuh, it was Thanksgiving last Thursday. Of course people listen to less music! And you're supposed to be the guys working with user behaviors.
The "fix" was reverted in spotify/luigi#557, and a proper fix was implemented in spotify/luigi#605. We used a new file system primitive with rename-dont-move semantics, also by Spotify in spotify/snakebite#128.
@Tarrasch on github