Knowpapa.com - a developer's blog

Django, Celery, RabbitMQ tutorial

The Big Picture

Use cases

You primarily use Celery to:
1) exclude time-taking jobs from blocking the request-response cycle,
2) schedule tasks to run at a specific time
3) manage tasks that may need to be retried.

Some common use-cases for this:
1) sending emails
2) rebuilding search Indexes on addition/modification/deletion of items from the search model.
3) doing CPU intensive tasks like image and video processing
4) doing tasks that are prone to failure and therefore might require retries.

Installation & Setup

I am assuming that you have a Django app up and running. If not, you must first set up a Django project.

Celery is written in Python, so we can install celery with pip:

pip install celery

I installed RabbitMQ from the Ubuntu repository:

sudo apt-get install rabbitmq-server

Please follow RabbitMQ installation instruction for your operating system from the official RabbitMQ site.

The RabbitMQ service starts automatically upon installation. You can manually start the server by running the following command on the command line.

rabbitmq-server

Add Celery to your Django Project.

Create a file named celery.py adjacent to your Django `settings.py` file.
This file will contain the celery configuration for our project. Add the following code to the file.

import os
from celery import Celery
from django.conf import settings

project = Celery('yourproject')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'yourproject.settings')
project.config_from_object('django.conf:settings')
project.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

The code above creates an instance of our project. The last line instructs celery to auto-discover all asynchronous tasks for all the applications listed under `INSTALLED_APPS`. Celery will look for definitions of asynchronous tasks within a file named `tasks.py` file in each of the application directory.

Next, create a `__init__.py` file in your Project root directory and add the following code to it:

import celery
from .celery import app as celery_app

This will ensure that celery configuration defined above is loaded when Django starts.

Build Celery Tasks

Since Celery will look for asynchronous tasks in a file named `tasks.py` within each application, you must create a file `tasks.py` in any application that wishes to run an asynchronous task.

In order for celery to identify a function as a task, it must have the decorator @task.

@task(ignore_result=True)
def send_email(recepient, title, subject):
    print('sending email')

@task
def rebuild_search_index():
     time.sleep(500) # mimicking a long running process
     print('rebuilt search index')
     return 42

The first task does not return any useful value so it has a parameter ignore_result=True. The second task is a long-running process and returns some value that we will use for subsequent updates.
We, therefore, do not add the ignore_result parameter to the task.

Next, let’s spawn some workers.

celery worker -A tasks -n worker_one -l info &
celery worker -A tasks -n worker_two -l info &

Executing the tasks asynchronously

Now that we have defined asynchronous tasks with the @task decorator, we can execute it anywhere in Django by calling the `delay()` method.

send_email.delay(recepient, title, subject)
index_count = rebuild_search_index.delay()

Since we used the delay method to execute the function, Celery passes the function to a worker to execute. The task will be added to the queue and will be executed by a worker in a non-blocking fashion. So even time-consuming processes should return immediately without blocking.

To check if a task has been completed, use the .ready method. If the .ready method returns “True”, it means the task has executed and we can get its return value using the .get() method as follows:

if rebuild_search_index.ready():
    index_count = rebuild_search_index.get()

You can also call the .get() method directly without testing with the .ready() method but in that case, you must add a “timeout” option so that your program isn’t forced to wait for the result, which would defeat the purpose of our implementation:

index_count = rebuild_search_index.get(timeout=1)

This raises an exception on timeout, which can be handled accordingly.

Monitoring Celery Workers

There is a handy web-based tool called Flower which can be used for monitoring and administrating Celery clusters. Flower provides detailed statistics of task progress and history. It also shows other task details such as the arguments passed, start time, runtime, and others.

Use the pip command to install Flower.

pip install flower

Once installed, launch Flower from the command line from your
project directory:

celery -A yourproject flower

The details can then viewed by visiting http://localhost:5555/dashboard in your browser.