Queue
Application Kit provides a Redis-backed job queue system for background processing.
Overview
The queue system uses Redis lists to manage jobs through three states:
- todo: Jobs waiting to be processed
- doing: Jobs currently being processed
- failed: Jobs that failed with exceptions
Configuration
Configure your queue Redis in application.json:
{
"dependencies": {
"databases": [
{
"name": "metrics_queue",
"type": "redis",
"extras": {"queue_prefix": "counter"}
}
]
}
}
The queue_prefix determines the Redis key prefix for the queue lists.
Queue Class
Basic Usage
"""Queue basic usage examples."""
from application_kit.queue.cua import Queue, QueueConfig
# Creating a queue
config = QueueConfig(name="metrics_queue", queue_prefix="counter")
queue = Queue(config)
# Adding jobs
job_id = queue.put({"action": "count", "endpoint": "/api/search", "count": 1})
# Processing jobs
result = queue.get()
if result[0] is not None:
job_id, job_data = result
if job_data is not None:
# Process the job
print(job_data["data"])
queue.mark_done(job_id)
# Queue statistics
todo_count = queue.get_todo_count()
doing_count = queue.get_doing_count()
failed_count = queue.get_failed_count()
Handling Failures
"""Queue error handling example."""
import traceback
from typing import Any
from application_kit.queue.cua import Queue, QueueConfig
config = QueueConfig(name="metrics_queue", queue_prefix="counter")
queue = Queue(config)
def process(data: Any) -> None:
"""Process job data."""
print(data)
result = queue.get()
if result[0] is not None:
job_id, job_data = result
if job_data is not None:
try:
process(job_data["data"])
queue.mark_done(job_id)
except Exception as e:
queue.mark_failed(job_id, e, traceback.format_exc())
Workers
Basic Worker
The Worker class processes jobs in a background thread:
"""Queue worker examples."""
from application_kit.queue.cua import JobData, Queue, QueueConfig, Worker, WorkerBase
def handle_job(job_id: str, data: JobData | None) -> None:
"""Process the job data."""
print(f"Processing job {job_id}: {data}")
# Basic worker
config = QueueConfig(name="metrics_queue", queue_prefix="counter")
queue = Queue(config)
worker = Worker(queue, handle_job)
worker.start()
# Custom worker with hooks
class MyWorker(WorkerBase):
"""Worker with custom lifecycle hooks."""
def before_job_hook(self) -> None:
"""Called before processing each job."""
pass
def after_job_hook(self) -> None:
"""Called after processing each job."""
pass
Django Integration
When Django is installed, the Worker class automatically:
- Closes old database connections before each job
- Resets query logs before each job
- Closes connections after each job
This prevents connection pooling issues in long-running workers.
Thread Pool
For concurrent processing, use ThreadPool:
"""Queue thread pool example."""
from application_kit.queue.cua import JobData, QueueConfig, ThreadPool
def handle_job(job_id: str, data: JobData | None) -> None:
"""Process job data."""
print(f"Processing {job_id}: {data}")
config = QueueConfig(name="metrics_queue", queue_prefix="counter")
pool = ThreadPool(num_threads=4, func=handle_job, config=config)
# Workers start automatically
# ...
# Graceful shutdown
pool.join()
Job Filtering
Filter which jobs are accepted using should_accept_job:
"""Queue job filtering example."""
from collections.abc import Mapping
from typing import Any
from application_kit.queue.cua import Queue, QueueConfig
def should_accept(data: Mapping[str, Any]) -> bool:
"""Only accept jobs for specific endpoints."""
endpoint = data.get("endpoint", "")
return isinstance(endpoint, str) and endpoint.startswith("/api/v2/")
config = QueueConfig(name="metrics_queue", queue_prefix="counter")
config.should_accept_job = should_accept
queue = Queue(config)
Job Data Structure
Jobs are stored with metadata:
{
"t": 1234567890.123, # Unix timestamp when job was created
"data": {...}, # Your job payload
"exception": { # Only present for failed jobs
"message": "Error message",
"trace": "Full stack trace"
}
}
Redis Key Structure
For a queue with prefix counter:
| Key | Type | Purpose |
|---|---|---|
counter:todo |
List | Jobs waiting to be processed |
counter:doing |
List | Jobs currently being processed |
counter:failed |
List | Jobs that failed |
counter-{uuid} |
String | Individual job data |
Error Handling
Failed jobs are moved to the failed queue with exception details. You can build retry logic or monitoring around the failed queue count.
Best Practices
- Idempotent handlers: Design job handlers to be safely re-runnable
- Graceful shutdown: Use
pool.join()to wait for workers to finish - Monitor queues: Track todo/doing/failed counts for operational visibility
- Handle Redis failures: The queue logs Redis errors but continues operating