Today will be a brief blog about using a Celery and Redis to schedule regular tasks!
As we developed our application, we started to wish we had some way of automating checks regularly for certain things. This included ensuring users were validated, or when sending emails to users with their matches when the partner matcher deadline was reached. We first of all looked into cron jobs as we were both familiar with this. However, considering our development environment were in Windows, which did not natively support these solutions such as django-crontab, we looked for better options.
Ultimately, Windows was pretty limiting after trying several other options such as django-apscheduler and even third-party crontab handlers. We wanted a local, efficient solution. That is when we came across Celery alongside its beat and worker servers. The setup was involved following the documentation. This solution ran on a Redis serves so we needed to start up a WSL instance on our local machines to test it. Our home lab that ran the main branch was on Debian, so this was not a problem for in-production deployment.
After installing redis-server and ensuring redis works on a WSL instance with redis-cli ping
responding with “PONG”, we began celery beat server and worker server to send regular functions & receive them respectively:
celery -A collegeguide beat --loglevel=info
celery -A collegeguide worker --loglevel=info
Following the setup, we had the default celery.py
in our backend, initiated in the __init__.py
of our Django app. We have added django_celery_beat to our installed apps in settings.py
and had these tasks running:
# CELERY SETTINGS SETUP
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULE = {
'test-print': {
'task': 'api.tasks.test_print', # essentially a ping test
'schedule': 30.0, # 30 seconds
},
'send-match-notifications-every-minute': {
'task': 'api.tasks.send_match_notifications', # match partners, archive queue & email top matches
'schedule': 60.0, # every 1 minute
},
'delete-unverified-users': {
'task': 'api.tasks.delete_unverified_users', # remove unverified users (over 24 hours)
'schedule': 300.0, # every 5 minutes
},
}
With the respective functions located in our tasks.py
in our api backend:
from celery import shared_task
from django.utils.timezone import now
from datetime import timedelta
from django.contrib.auth import get_user_model
from api.models import MatcherQueue
from api.utils import match_and_email
'''
ALL SHARED TASKS FOR CELERY TO SCHEDULE EXECUTE
'''
@shared_task
def send_match_notifications():
"""
Celery task to send match notifications for a given matcher queue.
"""
queues = MatcherQueue.objects.filter(match_date__lte=now(), notification_sent=False)
for queue in queues:
try:
match_and_email(queue.id) # Find matches & send email
queue.notification_sent = True # Used to archive queue & prevent further matching.
queue.save()
print(f"Notifications sent for queue: {queue.id}")
return "send_match_notification executed successfully"
except Exception as e:
print(f"Error processing queue {queue.id}: {e}")
@shared_task
def delete_unverified_users():
"""
Deletes users who haven't verified their email within 24 hours.
"""
time_threshold = now() - timedelta(hours=24)
unverified_users = get_user_model().objects.filter(is_verified=False, created_at__lte=time_threshold)
count = unverified_users.delete()
print(f"Deleted {count[0]} unverified users.")
return "delete_unverified_users executed successfully"
@shared_task
def test_print():
return "test_print executed successfully"
The celery worker output looked as follows, meaning we had successfully set this up. All that was left was to make sure this work in the home server (which it does), and we had a fully functional scheduled function execution!