(302) 414-9101
1001 S Main St, STE 600, Kalispell, MT 59901
contact@zarghamlabs.com

Blog Details

Celery + Redis: Building Background Task Queues for Python SaaS in 2026

Why Python SaaS Applications Need Background Task Queues

Any SaaS application doing real work — sending emails, processing webhooks, generating reports, running AI inference, syncing data — cannot do those things synchronously inside a web request. A user hits an endpoint, your server sends a WhatsApp message, and they wait 3 seconds for a response. That’s not acceptable. Celery with Redis is the standard solution for Python SaaS: offload slow work to background workers, respond instantly, and process at scale.

This is the exact stack powering Messenjo — every WhatsApp message sent, every broadcast campaign, every scheduled automation runs through Celery workers backed by Redis. Here’s how to build it correctly from the start.

The Core Architecture

FastAPI App → Celery Task (enqueue) → Redis Broker → Celery Worker → Execute Task
                                                            ↓ (store result)
                                                         Redis Result Backend

RedBeat (Scheduler) → Redis → Celery Beat → Periodic Tasks

Installation and Setup

# Install dependencies
pip install celery[redis] redis redbeat

# Or with Poetry
poetry add celery redis redbeat

Celery Configuration

# app/celery_app.py
from celery import Celery
from kombu import Queue

def create_celery_app() -> Celery:
    celery = Celery("myapp")
    
    celery.conf.update(
        # Broker and result backend
        broker_url="redis://localhost:6379/0",
        result_backend="redis://localhost:6379/1",
        
        # Serialization
        task_serializer="json",
        result_serializer="json",
        accept_content=["json"],
        
        # Task routing — separate queues by priority
        task_default_queue="default",
        task_queues=(
            Queue("critical", routing_key="critical"),   # WhatsApp sends
            Queue("default", routing_key="default"),     # General tasks
            Queue("bulk", routing_key="bulk"),           # Broadcast campaigns
            Queue("scheduled", routing_key="scheduled"), # Periodic work
        ),
        
        # Reliability settings
        task_acks_late=True,          # Ack only after task completes
        task_reject_on_worker_lost=True,  # Re-queue if worker dies
        worker_prefetch_multiplier=1, # One task at a time per worker
        
        # Result expiry
        result_expires=86400,  # 24 hours
        
        # Retry policy defaults
        task_max_retries=3,
        task_default_retry_delay=60,
        
        # RedBeat scheduler
        beat_scheduler="redbeat.RedBeatScheduler",
        redbeat_redis_url="redis://localhost:6379/2",
        beat_max_loop_interval=5,
    )
    
    return celery

celery_app = create_celery_app()

Writing Reliable Tasks

# app/tasks/whatsapp.py
from app.celery_app import celery_app
from celery.utils.log import get_task_logger
import httpx

logger = get_task_logger(__name__)

@celery_app.task(
    bind=True,
    name="tasks.send_whatsapp_message",
    queue="critical",
    max_retries=3,
    default_retry_delay=30,
    autoretry_for=(httpx.HTTPError, ConnectionError),
    retry_backoff=True,        # Exponential backoff
    retry_backoff_max=300,     # Max 5 min between retries
    acks_late=True,
)
def send_whatsapp_message(self, phone_number: str, message: dict):
    """Send a single WhatsApp message via Meta Cloud API."""
    try:
        logger.info(f"Sending WhatsApp to {phone_number}")
        
        response = httpx.post(
            f"https://graph.facebook.com/v18.0/{PHONE_NUMBER_ID}/messages",
            headers={"Authorization": f"Bearer {ACCESS_TOKEN}"},
            json={
                "messaging_product": "whatsapp",
                "to": phone_number,
                "type": "template",
                **message
            },
            timeout=10.0
        )
        response.raise_for_status()
        
        message_id = response.json()["messages"][0]["id"]
        logger.info(f"Sent successfully: {message_id}")
        return {"status": "sent", "message_id": message_id}
        
    except httpx.HTTPStatusError as exc:
        # Don't retry on 4xx errors (bad request, unauthorized)
        if 400 <= exc.response.status_code < 500:
            logger.error(f"Non-retryable error: {exc}")
            return {"status": "failed", "error": str(exc)}
        raise  # Retry on 5xx


@celery_app.task(
    name="tasks.process_broadcast_campaign",
    queue="bulk",
    rate_limit="100/m",  # Max 100 tasks per minute
)
def process_broadcast_campaign(campaign_id: str):
    """Fan out a broadcast campaign to individual send tasks."""
    from app.services.campaigns import get_campaign_recipients
    
    recipients = get_campaign_recipients(campaign_id)
    
    # Use chord for parallel execution with callback
    from celery import chord, group
    
    send_tasks = group(
        send_whatsapp_message.s(
            phone_number=r.phone,
            message=r.message_payload
        )
        for r in recipients
    )
    
    # Execute all sends, then update campaign status
    chord(send_tasks)(mark_campaign_complete.s(campaign_id=campaign_id))
    
    return {"queued": len(recipients)}

Periodic Tasks with RedBeat

RedBeat stores the beat schedule in Redis instead of a file, making it safe to run in multi-instance environments (like Kubernetes). Unlike the default file-based scheduler, RedBeat schedules survive worker restarts and can be updated at runtime without restarting.

# app/tasks/scheduled.py
from redbeat import RedBeatSchedulerEntry
from celery.schedules import crontab
from app.celery_app import celery_app

@celery_app.task(name="tasks.process_scheduled_messages", queue="scheduled")
def process_scheduled_messages():
    """Check for messages scheduled to send in the next minute."""
    from app.services.scheduler import get_due_messages
    
    due_messages = get_due_messages()
    for msg in due_messages:
        send_whatsapp_message.apply_async(
            args=[msg.phone_number, msg.payload],
            queue="critical"
        )
    return {"processed": len(due_messages)}


# Register periodic tasks programmatically
def register_periodic_tasks():
    entries = {
        "process-scheduled-messages": RedBeatSchedulerEntry(
            name="process-scheduled-messages",
            task="tasks.process_scheduled_messages",
            schedule=crontab(minute="*"),  # Every minute
            app=celery_app,
        ),
        "cleanup-expired-sessions": RedBeatSchedulerEntry(
            name="cleanup-expired-sessions",
            task="tasks.cleanup_expired_sessions",
            schedule=crontab(hour=2, minute=0),  # 2 AM daily
            app=celery_app,
        ),
    }
    
    for name, entry in entries.items():
        entry.save()
        print(f"Registered periodic task: {name}")

FastAPI Integration

# app/api/messages.py
from fastapi import APIRouter, BackgroundTasks
from app.tasks.whatsapp import send_whatsapp_message

router = APIRouter()

@router.post("/send-message")
async def send_message(request: MessageRequest):
    # Don't await — fire and forget to Celery
    task = send_whatsapp_message.apply_async(
        args=[request.phone_number, request.message],
        queue="critical",
        countdown=0,  # Execute immediately
    )
    
    # Return immediately — task runs in background
    return {
        "status": "queued",
        "task_id": task.id,
        "check_status": f"/tasks/{task.id}"
    }

@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    task = send_whatsapp_message.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": task.status,  # PENDING, STARTED, SUCCESS, FAILURE
        "result": task.result if task.ready() else None,
    }

Docker Compose Setup

services:
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data

  worker-critical:
    build: .
    command: celery -A app.celery_app worker -Q critical -c 4 --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis

  worker-bulk:
    build: .
    command: celery -A app.celery_app worker -Q bulk -c 2 --loglevel=info
    depends_on:
      - redis

  worker-default:
    build: .
    command: celery -A app.celery_app worker -Q default,scheduled -c 4 --loglevel=info
    depends_on:
      - redis

  celery-beat:
    build: .
    command: celery -A app.celery_app beat --scheduler redbeat.RedBeatScheduler --loglevel=info
    depends_on:
      - redis

  flower:
    build: .
    command: celery -A app.celery_app flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - redis

Monitoring with Flower

Flower is a real-time web UI for monitoring Celery workers and tasks. Access it at http://localhost:5555 to see: active workers, task throughput, failure rates, task execution times, and queue depths. For production, add basic auth and expose only internally or via VPN.

Production Checklist

Item Why It Matters Setting
task_acks_late=True Tasks survive worker crashes Global or per-task
worker_prefetch_multiplier=1 Fair distribution, no starvation Worker config
Separate queues by priority Critical tasks never blocked by bulk Architecture
Idempotent task logic Safe to retry without duplicates Code design
Redis persistence (appendonly yes) Queue survives Redis restart Redis config
Dead Letter Queue Capture permanently failed tasks Custom error handler

For the full Messenjo infrastructure stack, see our guides on multi-tenant SaaS architecture, FastAPI development services, and the upcoming Docker + Traefik production deployment guide.

Leave A Comment