Skip to content

Queue

Application Kit provides a Redis-backed job queue system for background processing.

Overview

The queue system uses Redis lists to manage jobs through three states:

  • todo: Jobs waiting to be processed
  • doing: Jobs currently being processed
  • failed: Jobs that failed with exceptions

Configuration

Configure your queue Redis in application.json:

{
  "dependencies": {
    "databases": [
      {
        "name": "metrics_queue",
        "type": "redis",
        "extras": {"queue_prefix": "counter"}
      }
    ]
  }
}

The queue_prefix determines the Redis key prefix for the queue lists.

Queue Class

Basic Usage

"""Queue basic usage examples."""

from application_kit.queue.cua import Queue, QueueConfig

# Creating a queue
config = QueueConfig(name="metrics_queue", queue_prefix="counter")
queue = Queue(config)

# Adding jobs
job_id = queue.put({"action": "count", "endpoint": "/api/search", "count": 1})

# Processing jobs
result = queue.get()
if result[0] is not None:
    job_id, job_data = result
    if job_data is not None:
        # Process the job
        print(job_data["data"])
        queue.mark_done(job_id)

# Queue statistics
todo_count = queue.get_todo_count()
doing_count = queue.get_doing_count()
failed_count = queue.get_failed_count()

Handling Failures

"""Queue error handling example."""

import traceback
from typing import Any

from application_kit.queue.cua import Queue, QueueConfig

config = QueueConfig(name="metrics_queue", queue_prefix="counter")
queue = Queue(config)


def process(data: Any) -> None:
    """Process job data."""
    print(data)


result = queue.get()
if result[0] is not None:
    job_id, job_data = result
    if job_data is not None:
        try:
            process(job_data["data"])
            queue.mark_done(job_id)
        except Exception as e:
            queue.mark_failed(job_id, e, traceback.format_exc())

Workers

Basic Worker

The Worker class processes jobs in a background thread:

"""Queue worker examples."""

from application_kit.queue.cua import JobData, Queue, QueueConfig, Worker, WorkerBase


def handle_job(job_id: str, data: JobData | None) -> None:
    """Process the job data."""
    print(f"Processing job {job_id}: {data}")


# Basic worker
config = QueueConfig(name="metrics_queue", queue_prefix="counter")
queue = Queue(config)
worker = Worker(queue, handle_job)
worker.start()


# Custom worker with hooks
class MyWorker(WorkerBase):
    """Worker with custom lifecycle hooks."""

    def before_job_hook(self) -> None:
        """Called before processing each job."""
        pass

    def after_job_hook(self) -> None:
        """Called after processing each job."""
        pass

Django Integration

When Django is installed, the Worker class automatically:

  • Closes old database connections before each job
  • Resets query logs before each job
  • Closes connections after each job

This prevents connection pooling issues in long-running workers.

Thread Pool

For concurrent processing, use ThreadPool:

"""Queue thread pool example."""

from application_kit.queue.cua import JobData, QueueConfig, ThreadPool


def handle_job(job_id: str, data: JobData | None) -> None:
    """Process job data."""
    print(f"Processing {job_id}: {data}")


config = QueueConfig(name="metrics_queue", queue_prefix="counter")
pool = ThreadPool(num_threads=4, func=handle_job, config=config)

# Workers start automatically
# ...

# Graceful shutdown
pool.join()

Job Filtering

Filter which jobs are accepted using should_accept_job:

"""Queue job filtering example."""

from collections.abc import Mapping
from typing import Any

from application_kit.queue.cua import Queue, QueueConfig


def should_accept(data: Mapping[str, Any]) -> bool:
    """Only accept jobs for specific endpoints."""
    endpoint = data.get("endpoint", "")
    return isinstance(endpoint, str) and endpoint.startswith("/api/v2/")


config = QueueConfig(name="metrics_queue", queue_prefix="counter")
config.should_accept_job = should_accept

queue = Queue(config)

Job Data Structure

Jobs are stored with metadata:

{
    "t": 1234567890.123,  # Unix timestamp when job was created
    "data": {...},        # Your job payload
    "exception": {        # Only present for failed jobs
        "message": "Error message",
        "trace": "Full stack trace"
    }
}

Redis Key Structure

For a queue with prefix counter:

Key Type Purpose
counter:todo List Jobs waiting to be processed
counter:doing List Jobs currently being processed
counter:failed List Jobs that failed
counter-{uuid} String Individual job data

Error Handling

Failed jobs are moved to the failed queue with exception details. You can build retry logic or monitoring around the failed queue count.

Best Practices

  1. Idempotent handlers: Design job handlers to be safely re-runnable
  2. Graceful shutdown: Use pool.join() to wait for workers to finish
  3. Monitor queues: Track todo/doing/failed counts for operational visibility
  4. Handle Redis failures: The queue logs Redis errors but continues operating