Why Python SaaS Applications Need Background Task Queues
Any SaaS application doing real work — sending emails, processing webhooks, generating reports, running AI inference, syncing data — cannot do those things synchronously inside a web request. A user hits an endpoint, your server sends a WhatsApp message, and they wait 3 seconds for a response. That’s not acceptable. Celery with Redis is the standard solution for Python SaaS: offload slow work to background workers, respond instantly, and process at scale.
This is the exact stack powering Messenjo — every WhatsApp message sent, every broadcast campaign, every scheduled automation runs through Celery workers backed by Redis. Here’s how to build it correctly from the start.
The Core Architecture
↓ (store result)
Redis Result Backend
RedBeat (Scheduler) → Redis → Celery Beat → Periodic Tasks
Installation and Setup
# Install dependencies
pip install celery[redis] redis redbeat
# Or with Poetry
poetry add celery redis redbeat
Celery Configuration
# app/celery_app.py
from celery import Celery
from kombu import Queue
def create_celery_app() -> Celery:
celery = Celery("myapp")
celery.conf.update(
# Broker and result backend
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/1",
# Serialization
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# Task routing — separate queues by priority
task_default_queue="default",
task_queues=(
Queue("critical", routing_key="critical"), # WhatsApp sends
Queue("default", routing_key="default"), # General tasks
Queue("bulk", routing_key="bulk"), # Broadcast campaigns
Queue("scheduled", routing_key="scheduled"), # Periodic work
),
# Reliability settings
task_acks_late=True, # Ack only after task completes
task_reject_on_worker_lost=True, # Re-queue if worker dies
worker_prefetch_multiplier=1, # One task at a time per worker
# Result expiry
result_expires=86400, # 24 hours
# Retry policy defaults
task_max_retries=3,
task_default_retry_delay=60,
# RedBeat scheduler
beat_scheduler="redbeat.RedBeatScheduler",
redbeat_redis_url="redis://localhost:6379/2",
beat_max_loop_interval=5,
)
return celery
celery_app = create_celery_app()
Writing Reliable Tasks
# app/tasks/whatsapp.py
from app.celery_app import celery_app
from celery.utils.log import get_task_logger
import httpx
logger = get_task_logger(__name__)
@celery_app.task(
bind=True,
name="tasks.send_whatsapp_message",
queue="critical",
max_retries=3,
default_retry_delay=30,
autoretry_for=(httpx.HTTPError, ConnectionError),
retry_backoff=True, # Exponential backoff
retry_backoff_max=300, # Max 5 min between retries
acks_late=True,
)
def send_whatsapp_message(self, phone_number: str, message: dict):
"""Send a single WhatsApp message via Meta Cloud API."""
try:
logger.info(f"Sending WhatsApp to {phone_number}")
response = httpx.post(
f"https://graph.facebook.com/v18.0/{PHONE_NUMBER_ID}/messages",
headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
json={
"messaging_product": "whatsapp",
"to": phone_number,
"type": "template",
**message
},
timeout=10.0
)
response.raise_for_status()
message_id = response.json()["messages"][0]["id"]
logger.info(f"Sent successfully: {message_id}")
return {"status": "sent", "message_id": message_id}
except httpx.HTTPStatusError as exc:
# Don't retry on 4xx errors (bad request, unauthorized)
if 400 <= exc.response.status_code < 500:
logger.error(f"Non-retryable error: {exc}")
return {"status": "failed", "error": str(exc)}
raise # Retry on 5xx
@celery_app.task(
name="tasks.process_broadcast_campaign",
queue="bulk",
rate_limit="100/m", # Max 100 tasks per minute
)
def process_broadcast_campaign(campaign_id: str):
"""Fan out a broadcast campaign to individual send tasks."""
from app.services.campaigns import get_campaign_recipients
recipients = get_campaign_recipients(campaign_id)
# Use chord for parallel execution with callback
from celery import chord, group
send_tasks = group(
send_whatsapp_message.s(
phone_number=r.phone,
message=r.message_payload
)
for r in recipients
)
# Execute all sends, then update campaign status
chord(send_tasks)(mark_campaign_complete.s(campaign_id=campaign_id))
return {"queued": len(recipients)}
Periodic Tasks with RedBeat
RedBeat stores the beat schedule in Redis instead of a file, making it safe to run in multi-instance environments (like Kubernetes). Unlike the default file-based scheduler, RedBeat schedules survive worker restarts and can be updated at runtime without restarting.
# app/tasks/scheduled.py
from redbeat import RedBeatSchedulerEntry
from celery.schedules import crontab
from app.celery_app import celery_app
@celery_app.task(name="tasks.process_scheduled_messages", queue="scheduled")
def process_scheduled_messages():
"""Check for messages scheduled to send in the next minute."""
from app.services.scheduler import get_due_messages
due_messages = get_due_messages()
for msg in due_messages:
send_whatsapp_message.apply_async(
args=[msg.phone_number, msg.payload],
queue="critical"
)
return {"processed": len(due_messages)}
# Register periodic tasks programmatically
def register_periodic_tasks():
entries = {
"process-scheduled-messages": RedBeatSchedulerEntry(
name="process-scheduled-messages",
task="tasks.process_scheduled_messages",
schedule=crontab(minute="*"), # Every minute
app=celery_app,
),
"cleanup-expired-sessions": RedBeatSchedulerEntry(
name="cleanup-expired-sessions",
task="tasks.cleanup_expired_sessions",
schedule=crontab(hour=2, minute=0), # 2 AM daily
app=celery_app,
),
}
for name, entry in entries.items():
entry.save()
print(f"Registered periodic task: {name}")
FastAPI Integration
# app/api/messages.py
from fastapi import APIRouter, BackgroundTasks
from app.tasks.whatsapp import send_whatsapp_message
router = APIRouter()
@router.post("/send-message")
async def send_message(request: MessageRequest):
# Don't await — fire and forget to Celery
task = send_whatsapp_message.apply_async(
args=[request.phone_number, request.message],
queue="critical",
countdown=0, # Execute immediately
)
# Return immediately — task runs in background
return {
"status": "queued",
"task_id": task.id,
"check_status": f"/tasks/{task.id}"
}
@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
task = send_whatsapp_message.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status, # PENDING, STARTED, SUCCESS, FAILURE
"result": task.result if task.ready() else None,
}
Docker Compose Setup
services:
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
worker-critical:
build: .
command: celery -A app.celery_app worker -Q critical -c 4 --loglevel=info
environment:
- REDIS_URL=redis://redis:6379
depends_on:
- redis
worker-bulk:
build: .
command: celery -A app.celery_app worker -Q bulk -c 2 --loglevel=info
depends_on:
- redis
worker-default:
build: .
command: celery -A app.celery_app worker -Q default,scheduled -c 4 --loglevel=info
depends_on:
- redis
celery-beat:
build: .
command: celery -A app.celery_app beat --scheduler redbeat.RedBeatScheduler --loglevel=info
depends_on:
- redis
flower:
build: .
command: celery -A app.celery_app flower --port=5555
ports:
- "5555:5555"
depends_on:
- redis
Monitoring with Flower
Flower is a real-time web UI for monitoring Celery workers and tasks. Access it at http://localhost:5555 to see: active workers, task throughput, failure rates, task execution times, and queue depths. For production, add basic auth and expose only internally or via VPN.
Production Checklist
| Item | Why It Matters | Setting |
|---|---|---|
task_acks_late=True |
Tasks survive worker crashes | Global or per-task |
worker_prefetch_multiplier=1 |
Fair distribution, no starvation | Worker config |
| Separate queues by priority | Critical tasks never blocked by bulk | Architecture |
| Idempotent task logic | Safe to retry without duplicates | Code design |
Redis persistence (appendonly yes) |
Queue survives Redis restart | Redis config |
| Dead Letter Queue | Capture permanently failed tasks | Custom error handler |
For the full Messenjo infrastructure stack, see our guides on multi-tenant SaaS architecture, FastAPI development services, and the upcoming Docker + Traefik production deployment guide.
