luigi-budapest-bi-oct-2015



luigi-budapest-bi-oct-2015

0 0


luigi-budapest-bi-oct-2015

Presentation: Luigi Task Orchestrator – three aspects

On Github Tarrasch / luigi-budapest-bi-oct-2015

Luigi Task Orchestrator

Wednesday, October 14, 2015

Budapest BI Forum 2015

Who's this guy?

  • Data Engineer at Spotify

  • I write code most of the time, but not for the sake of it!!!!!

  • Arash Rouhani. @Tarrasch on github. Authored zsh-bd

Spoti-what?

We stream the right music for the right moment.

75M+ active users. 30M+ songs. $3bn paid to rightsholders.

This talk

Introduction to luigi + declarative programming Building up an open source community Story time - fun anecdote

Introduction to luigi

What is luigi?

github.com/Spotify/luigi

A task orchestrator made in house at Spotify.

Open sourced late 2012.

it even has spinoffs ;)

github.com/luiti/luiti

github.com/intentmedia/mario

A dependency graph

Task DSL

It is very much like GNU Make.

class MyTask(luigi.Task):
    some_parameter = luigi.Parameter(default="hello")

    def complete(self):
        return True or False

    def requires(self):
        return [TaskA(), TaskB(param='yay')]

    def run(self):
        print self.some_parameter, 'world'

Small example

import luigi

class HelloWorldTask(luigi.Task):
    def run(self):
        with self.output().open('w') as fd:
            fd.write('Hello World')

    def output(self):
        return luigi.LocalTarget('hello.txt')

Lets run it!

$ luigi --module helloworld HelloWorldTask --local-scheduler
...
INFO: Scheduled HelloWorldTask() (PENDING)
...
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 HelloWorldTask()

===== Luigi Execution Summary =====

$ cat hello.txt
Hello World

And lets run it again!

$ luigi --module helloworld HelloWorldTask --local-scheduler
...
INFO: Scheduled HelloWorldTask() (DONE)
...
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 HelloWorldTask()

Did not run any tasks

===== Luigi Execution Summary =====

Does It feel declarative?

Kind of. Despite that class statements in python are statements. We've made using luigi feel declarative.

This feels like a statement in python, not a declaration

class MyNewClass(object):
    sys.exit(1)

So how do one achieve this declarative feel?

class MyTask(luigi.Task):
    def run(self): pass
    def complete(self): pass
    def requires(self): pass
$ luigi --module my_tasks MyTask

We need to bring out our Python-fu!

By having a registry!

# from the luigi library source code:
@six.add_metaclass(Register)
class Task(object):
    ...

class Register(abc.ABCMeta):
    def __new__(metacls, classname, bases, classdict):
        """ Is called when a Task class is "declared". """
        cls = super(Register, metacls).__new__(...)
        metacls._reg.append(cls)
        return cls

Building an Open Source Community

Current (1st oct) gh pages for orchestrators

Observations

  • Clearly, community is not built by just being an apache project.
  • However, it flocks users and enterprise support!
    • Both Cloudera and Hortonworks support Oozie and include it in their Hadoop distribution.

In Spotify, we've debated if we want to push Spotify Luigi to become Apache Luigi or not.

I guess that Luigi's community success comes from:

  • Spotify being responsive in the email group and github PRs and issues
  • It being hosted on GitHub
  • The problem of orchestration is wide

  • Luigi takes about 10 minutes to just try out.

  • API is very intuitive.

Significant contributions by non-Spotifiers

  • 100% of the Web UI
  • Most of the scheduling logic
  • Edge-case handling in the worker
  • Many storage and compute engine integrations
    • AWS, GCP, Redshift, Elastic Search, SqlAlchemy etc.

The Thanksgiving bug

story time!

Reporting to labels at Spotify

When you listen to music on Spotify, we report this to labels and pay them.

How client logs are stored in Spotify

When you listen to songs we save EndSong messages.

$ hadoop fs -ls /logs/EndSong/2015-08-27/06/
/cleaned/endsong/2015-08-27/06/part-00000.avro
/cleaned/endsong/2015-08-27/06/part-00001.avro
/cleaned/endsong/2015-08-27/06/part-00002.avro
...

The files are split into multiple avro files.

The mv command in unix

What does this do?

$ mkdir my_directcry
$ mv my_file.txt my_directory

What luigi did

$ hadoop -jar mapreduce.jar -Poutput=output-luigi-tmp-123456
$ hadoop fs -mv output-luigi-tmp-123456 output

Now what can go wrong? ;)

An awesome fix I did! spotify/luigi#522

$ hadoop -jar mapreduce.jar -Poutput=output-luigi-tmp-123456
$ hadoop fs -mkdir output
$ hadoop fs -mv output-luigi-tmp-123456/* output

The trick is that hadoop fs -mkdir output will crash if folder exist. That's good, so no other thread can create it. But...

Just a week or so after thanksgiving, a analyst came down to the Data Engineers.

Analyst: Hey, a label reported that they received only 80% of the expected revenue for last Thursday. We think there's a bug in your systems. If labels get pissed, we will all lose our jobs!

Engineer: Well duuh, it was Thanksgiving last Thursday. Of course people listen to less music! And you're supposed to be the guys working with user behaviors.

No animals were harmed

The "fix" was reverted in spotify/luigi#557, and a proper fix was implemented in spotify/luigi#605. We used a new file system primitive with rename-dont-move semantics, also by Spotify in spotify/snakebite#128.






					

Thanks for listening

@Tarrasch on github

http://tarrasch.github.io/luigi-budapest-bi-oct-2015

https://github.com/Tarrasch/luigi-budapest-bi-oct-2015

Luigi Task Orchestrator Wednesday, October 14, 2015 Budapest BI Forum 2015