Python Celery: Writing custom Tasks


If you landed here while searching for something, you probably know what I am going to talk about. For others who are unaware, celery is an asynchronous task queue based on distributed message passing written in Python. Quoting from their homepage, “Celery is used in production systems to process millions of tasks a day”. Often, in most practical use-cases you will be better off writing custom tasks rather than decorating functions as tasks. This not only has the benefit of fine-grain control over task execution but also help modularize your codebase. In this post, I will describe how to set up and create a custom task in celery.

Let’s begin by creating an empty celery project. Create a folder with the following directory structure:

1
2
3
4
5
celery_app/
├── __init__.py
├── celery.py
├── celeryconfig.py
└── tasks.py

In the above structure, celeryconfig.py is used for configuration, celery.py is used for creating celery app and tasks.py is used to write our custom task. To begin with, fill the contents of celery.py:

celery.py
1
2
3
4
5
6
7
from celery import Celery

app = Celery()
app.config_from_object('celery_app.celeryconfig')

if __name__ == "__main__":
app.start()

You can leave celeryconfig.py empty or populate it with your configuration. Next, we start writing our tasks.py:

1
2
3
4
from celery_app.celery import app

class Register(app.Task):
"Task to register a new user"

The advantage of subclassing from our app.Task is that we can now write our body of the task, and also add code for handling situations in an event of success, failure, retry, etc. This will allow us to log specific events, write callbacks for failed events, etc. There are several methods of celery.app.task, but we will focus on the three important ones, namely run, on_success and on_failure.

Let’s suppose we are building the backend of a user authentication system. For this purpose, we would have to write code to validate user data and login successfully. If our server receives the JSON-encoded form data as a payload to our task, we can define the run method as follows:

1
2
3
4
5
6
def run(self, payload):
"Function to process and validate payload data"
user_data = json.loads(payload)
# Code for validation of payload goes here
# We do not want the task to return anything,
# hence no return.

Next, we move on to define our custom callbacks. You can think of this functionality as try-catch block in traditional programming paradigm. First, we define on_success, which will hold the logic when our run body executes successfully, i.e. when the payload is validated. We may want to send a notification email to the user, log the login event in a database, etc. The input parameters for this method include retval(return value) which is None as we are not returning anything, a unique task_id, args which is a tuple containing the original payload and kwargs, any keyword arguments which you will supply. For the moment, let’s skip the kwargs parameter. Our function will then be as follows:

1
def on_success(self, retval, task_id, args, kwargs)

For instance, we can keep track of the number of times the user is trying to register with retry default parameter. We will also create an on_failure callback to write logic when unsuccessful login events occur. We may optionally add a retry parameter to keep track of unsuccessful login attempts. Below is the function for on_failure:

1
def on_failure(exc, task_id, args, kwargs, einfo):

Before finishing up, we will add the code to keep the track of unsuccessful login via retry parameter, and create a new task with updated retry value. We can handle the maximum number of incorrect logins in the run method itself. Notice how we are sending a new task with the same payload and updated retry value in on_failure method. In this way, we can retry a task manually. The complete code is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

import json
from celery_app.celery import app

class Login(app.Task):
"Class to login a user"
name = "login_task"
ignore_result = True

def run(self, payload, retry=0):
"Function to process and validate payload data"
if retry >= 3 :
send_to_user("You are prevented from loggging in for 10 mins!")

# Validate user_data if retry within
# permitted limit
user_data = json.loads(payload)

def on_sucess(retval, task_id, args, kwargs):
# Notify user with email
payload = args[0]
self.notify_via_email(payload["email"])

def on_failure(exc, task_id, args, kwargs, einfo):
# Log unsuccessful login
raw_payload = args[0]
payload = json.loads(raw_payload)
retry = kwargs["retry"]
print("Incorrect Login attempt from IP: ", payload("ip"))
self.log("error", payload)
app.send_task("login_task", args=(raw_payload, retry + 1))

# register the task
app.tasks.register(Login)

Bonus: Connecting to signals

Celery offers various signal which allows applications to receive notifications when certain events occur. Events are especially helpful if you plan to run some pre or post code before creation, running and retrying tasks. In one of my use case, I needed to connect to fluentd logger for custom logging into a MongoDB database. Although signals can be used as decorators, I find the below method clean and flexible. All it needs is to modify celery.py file as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

def after_setup_logger_handler(sender=None, logger=None, loglevel=None,
logfile=None, format=None,
colorize=None, **kwds):
custom_format = {
'host': '%(hostname)s',
'where': '%(module)s.%(funcName)s',
'type': '%(levelname)s',
'stack_trace': '%(exc_text)s'
}
h = handler.FluentHandler(**your params**)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(your_formatter)
logger.addHandler(h)


after_setup_task_logger.connect(after_setup_logger_handler)
after_setup_logger.connect(after_setup_logger_handler)

I hope you found this article useful. Comments, suggestions, and criticisms are welcomed with an open heart.

ShareComments