Sam Clarke
Sam Clarke
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
Introducing Celery into your application is trade-off between performance and complexity.
Brokers & Backends
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
sudo apt-get install rabbitmq-server (redis)
pip install celery
# app.py from celery import Celery celery = Celery( 'app', broker="amqp://guest@localhost//", backend="redis://localhost:6379/0" ) # Note: in production, use a dedicated user/ vhost # with appropriate permissions for broker/ backend
# app.py from celery import Celery celery = Celery() celery.config_from_object('celery_config')
# celery_config.py import os # note settings are lower case since version >= 4.0 broker_url = os.environ['BROKER_URL'] result_backend = os.environ['BACKEND_URL'] worker_pool_restarts = True ...
celery worker -A app.celery -c 2
Example output from a Celery worker running locally.
Highlighted are the configuration and the registered tasks.
The Celery task decorator.
# tasks.py def send_email(user_id): user = User.objects.get(id=user_id) subject = 'Welcome, {}'.format(user.username) body = 'Hello from Celery' send_email(user.email, subject, body) # call task synchronously send_email(user_id=1234)
# tasks.py from app import celery @celery.task def send_email(user_id): user = User.objects.get(id=user_id) subject = 'Welcome, {}'.format(user.username) body = 'Hello from Celery' send_email(user.email, subject, body) # call task asynchronously send_email.delay(user_id=1234)
# tasks.py @celery.task def send_email(user_id): user = User.objects.get(id=user_id) subject = 'Welcome, {}'.format(user.username) body = 'Hello from Celery' send_email(user.email, subject, body) # why not just pass in the user object? send_email.delay(user_id=1234)
from app import celery @celery.task def add(x, y): return x + y # This is not what you may expect... result = add.delay(5, 7)
from app import celery @celery.task def add(x, y): return x + y # store a reference to task AsyncResult async_result = add.delay(5, 7) # get async result using the task's AsyncResult method result = add.AsyncResult(async_result.id) if result.ready(): return result.get() else: # do something else
Possible default task states:
PENDING, STARTED, SUCCESS, FAILURE or REVOKED.
result = add.AsyncResult(async_result.id) if result.state == 'SUCCESS': # do something with the result return result.get() else: # do something else ...
Sometimes you need more detailed state messages.
# This task specific status (filesize) external to Celery # 'PROGRESS' is our custom state ... percent_done = self._get_progress() task.update_state( state='PROGRESS', meta={'progress': percent_done} ) ...
Here we don't rely on Celery's task status to let us know whether the task completed successfully.
user = User( username='foo', email='foo@bar.com', email_sent=False) # Note that we tell Celery to # ignore the result @celery.task(ignore_result=True) def send_email(user_id): user = User.objects.get(id=user_id) ... # email sent, record state in User profile user.email_sent = True user.save()
This blog post: http://bit.ly/2eqd1DZ
.s() - shorthand for .signature()
from celery import chain # the second task takes result of first task and 10 as args async_result = chain( add.s(5, 7), add.s(10)).delay() # this is the result of the chain (final task) result_two = add.AsyncResult(async_result.id).result # we can traverse the task tree to get previous results result_one = add.AsyncResult(async_result.parent.id).result
You probably don't want to wait for your unit tests to execute asynchronously.
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
description "celeryd" start on runlevel [2345] stop on runlevel [!2345] setuid celery setgid celery respawn chdir /path/to/app script . /etc/environment export PATH=/path/to/.env/bin exec celery worker -A app.celery -c 4 -n worker1.%h end script
1. Introduction to Celery
2. Adding Celery to your application
3. Celery in Production
4. Logging and Monitoring
import logging from celery.utils.log import get_task_logger # Normal logging handler = logging.StreamHandler() logger.addHandler(handler) # CELERY logging # create a common logger for all of your tasks celery_logger = get_task_logger(__name__) @celery.task(ignore_result=True) def send_email(user_id): ... celery_logger.info( 'Email Sent to {}'.format(user.username))
pip install flower
celery flower -A app.celery
"RabbitMQ supports priorities since version 3.5.0, and the Redis transport emulates priority support."
In Production, it is best to route high-priority tasks to dedicated workers.
Even though it adds complexity, Celery is still one of my go-to tools for any new web app.
Even apps of modest size can benefit from a task queue.
@samclarkeg
github.com/samgclarke