Demystifying Celery



Demystifying Celery

0 0


presentations

Presentations

On Github samgclarke / presentations

Demystifying Celery

Sam Clarke

Celery

Celery

Who I am

Sam Clarke

  • Technical Director @ Rival Schools
  • Lead Developer     @ Script Speaker

Why this talk?

  • New things are hard.
  • Distributed what?
  • It's not just Python?

Demystifying Celery

1. Introduction to Celery

2. Adding Celery to your application

3. Celery in Production

4. Logging and Monitoring

What is Celery?

  • Celery is written in Python.
  • Celery is a task queue.
  • Django-Celery is also a thing.

Do I need Celery?

  • Maybe you don't (your app is syncronous).
  • You have tasks which don't need to be executed syncronously.
  • You are configuring your application for performance where available resources are a concern.

Do I need Celery?

Introducing Celery into your application is trade-off between performance and complexity.

How Does Celery work?

Brokers & Backends

Celery Brokers

  • Celery communicates to workers via messages, where the broker acts as an agent.
  • Recommended options are RabbitMQ, Redis.

Celery Backends

  • Celery can store task state (optional) using a results backend.
  • Common result backends include RabbitMQ, Redis, MemcacheD, SQL, Elasticsearch.

Workers & Concurrency

  • A worker is a Celery process, which can spawn child processes.
  • Specify number of worker processes/threads with the --concurrency argument, which defaults to the number of CPUs.

Demystifying Celery

1. Introduction to Celery

2. Adding Celery to your application

3. Celery in Production

4. Logging and Monitoring

Install system packages

sudo apt-get install rabbitmq-server (redis)
pip install celery

Instantiating Celery

#  app.py

from celery import Celery

celery = Celery(
   'app',
    broker="amqp://guest@localhost//",
    backend="redis://localhost:6379/0"
)

#  Note: in production, use a dedicated user/ vhost
#  with appropriate permissions for broker/ backend

Using a config file

#  app.py

from celery import Celery

celery = Celery()
celery.config_from_object('celery_config')
#  celery_config.py

import os

#  note settings are lower case since version >= 4.0 
broker_url = os.environ['BROKER_URL']
result_backend = os.environ['BACKEND_URL']
worker_pool_restarts = True
...

Starting Celery

celery worker -A app.celery -c 2

  • -A (--app): relative path to celery instance
  • -c (--concurrency): number of child process

Example output from a Celery worker running locally.

Highlighted are the configuration and the registered tasks.

Registering Tasks

The Celery task decorator.

Regular Function

#  tasks.py

def send_email(user_id):
    user = User.objects.get(id=user_id)
    subject = 'Welcome, {}'.format(user.username)
    body = 'Hello from Celery'
    send_email(user.email, subject, body)

#  call task synchronously
send_email(user_id=1234)

Celery Task

#  tasks.py

from app import celery

@celery.task
def send_email(user_id):
    user = User.objects.get(id=user_id)
    subject = 'Welcome, {}'.format(user.username)
    body = 'Hello from Celery'
    send_email(user.email, subject, body)

#  call task asynchronously
send_email.delay(user_id=1234)

Don't pass state to tasks!

#  tasks.py

@celery.task
def send_email(user_id):
    user = User.objects.get(id=user_id)
    subject = 'Welcome, {}'.format(user.username)
    body = 'Hello from Celery'
    send_email(user.email, subject, body)


#  why not just pass in the user object?
send_email.delay(user_id=1234)

Getting Results

from app import celery

@celery.task
def add(x, y):
    return x + y

#  This is not what you may expect...
result = add.delay(5, 7)

Getting Results

from app import celery

@celery.task
def add(x, y):
    return x + y

#  store a reference to task AsyncResult
async_result = add.delay(5, 7)

#  get async result using the task's AsyncResult method
result = add.AsyncResult(async_result.id)

if result.ready():
    return result.get()
else:
    #  do something else

Useful AsyncResult methods

  • .get() - blocks until ready. Returns result or Exception.
  • .successful() - returns True is task executed successfully.
  • .ready() - returns True if task is done.
  • .revoke() - workers will ignore task. If terminate=True, execution is killed.

Getting Task State

Possible default task states:

PENDING, STARTED, SUCCESS, FAILURE or REVOKED.

result = add.AsyncResult(async_result.id)

if result.state == 'SUCCESS':
    #  do something with the result
    return result.get()
else:
    #  do something else
    ...

Custom States

Sometimes you need more detailed state messages.

#  This task specific status (filesize) external to Celery
#  'PROGRESS' is our custom state

...
percent_done = self._get_progress()
task.update_state(
    state='PROGRESS',
    meta={'progress': percent_done}
)
...

Keeping state outside of Celery

Here we don't rely on Celery's task status to let us know whether the task completed successfully.

user = User(
    username='foo', email='foo@bar.com', email_sent=False)


#  Note that we tell Celery to
#  ignore the result
@celery.task(ignore_result=True)
def send_email(user_id):
    user = User.objects.get(id=user_id)
    ...
    #  email sent, record state in User profile
    user.email_sent = True
    user.save()

Keeping state outside of Celery

This blog post: http://bit.ly/2eqd1DZ

Relying on Celery

  • task_acks_late - workers acknowledge task after execution, not before.
  • Can increase reliability for idempotent tasks.

More complex workflows

  • Celery Canvas allows combining of tasks in interesting ways using signatures.
  • A signature is a serializable representation of a task.
  • Useful signatures include: group, chain*, chord, map.

Chaining Tasks

  • Task signatures can be passed to other tasks.
  • Tasks can be linked together: the linked task is called when the task returns successfully.
  • A chain is multiple tasks run in series - allows tasks to be composed together rather than blocking each other.

Chaining Tasks

.s() - shorthand for .signature()

from celery import chain

#  the second task takes result of first task and 10 as args
async_result = chain(
    add.s(5, 7),
    add.s(10)).delay()

#  this is the result of the chain (final task)
result_two = add.AsyncResult(async_result.id).result

#  we can traverse the task tree to get previous results
result_one = add.AsyncResult(async_result.parent.id).result

Testing Celery Tasks

You probably don't want to wait for your unit tests to execute asynchronously.

Testing strategies

  • Use Celery's ALWAYS_EAGER mode.
  • Use task.apply_async() instead of task.delay().
  • Django has it's own CeleryTestSuiteRunner.

Demystifying Celery

1. Introduction to Celery

2. Adding Celery to your application

3. Celery in Production

4. Logging and Monitoring

Daemonizing Celery

  • You need to be able to run Celery workers in background and restart workers automatically.
  • Daemonizing services include Init, Upstart*, SupervisorD.

Daemonizing Celery using Upstart

/etc/init/celeryd.conf
description "celeryd"

start on runlevel [2345]
stop on runlevel [!2345]

setuid celery
setgid celery

respawn

chdir /path/to/app

script
    . /etc/environment
    export PATH=/path/to/.env/bin
    exec celery worker -A app.celery -c 4 -n worker1.%h
end script

Demystifying Celery

1. Introduction to Celery

2. Adding Celery to your application

3. Celery in Production

4. Logging and Monitoring

Logging

  • Celery has its own logging handler.
  • Your regular logs will get swallowed. Though there is the CELERYD_HIJACK_ROOT_LOGGER option.
  • You can also set up per-task logging.

Logging

import logging
from celery.utils.log import get_task_logger

#  Normal logging
handler = logging.StreamHandler()
logger.addHandler(handler)


#  CELERY logging
#  create a common logger for all of your tasks
celery_logger = get_task_logger(__name__)

@celery.task(ignore_result=True)
def send_email(user_id):
    ...
    celery_logger.info(
        'Email Sent to {}'.format(user.username))

Monitoring with Flower

  • Web interface for monitoring Celery.
  • By default runs on port :5555

Installing Flower

pip install flower
celery flower -A app.celery

Task Priority

"RabbitMQ supports priorities since version 3.5.0, and the Redis transport emulates priority support."

In Production, it is best to route high-priority tasks to dedicated workers.

Celery Alternatives

  • Huey - https://github.com/coleifer/huey
  • RQ - http://python-rq.org/

Take Aways

  • Don't block. If you need to wait for the result it's not asychronous.
  • Use RabbitMQ as broker. Redis is a good backend.
  • Don't send objects to tasks. State can change.
  • Use the Celery Logger inside tasks.

Do I need Celery?

Even though it adds complexity, Celery is still one of my go-to tools for any new web app.

Even apps of modest size can benefit from a task queue.

Thank you

@samclarkeg

github.com/samgclarke

1
Demystifying Celery Sam Clarke