Skip to content

Metrics

Application Kit provides a Redis-backed queue system for counting API requests. This enables usage tracking and billing calculations by recording how many requests each project makes to different products and endpoints.

Metrics Queue

The metrics queue records request counts that are processed asynchronously by a worker. Use add_job_to_queue_for_token() after authenticating a request to count it. The function:

  • Extracts organization and project information from ProjectTokenModel
  • Respects the should_count context flag (internal requests on port 8000 are excluded)
  • Tracks request source via X-SDK-Source header or AI-Context: true header

Queue Infrastructure

The Queue class provides a reliable Redis-backed job queue with support for job tracking and failure handling. Jobs move through three Redis lists:

  • todo: Pending jobs waiting to be processed
  • doing: Jobs currently being processed
  • failed: Jobs that encountered errors

The ThreadPool and Worker classes provide background processing with proper Django database connection management via signals.

application_kit.metrics.queue

Core implementation of the metrics queue client interaction.

# get the project token on Django-based applications
project_token = get_project_token(request)

# on FastAPI-based applications:
project_token = request.state.project_token
# or through a dependency like AuthenticateKey.

add_job_to_queue_for_token(project_token, "MAPS", "load")

# if needed we can specify the count

add_job_to_queue_for_token(project_token, "STORES", "import", count=435)

Tip

The request header X-SDK-Source is internally used by SDKs to populate the job source field.

The request header AI-Context (boolean value) is used to populate the job source field.

build_job_data

build_job_data(
    product,
    kind,
    organization,
    counter=1,
    timestamp=None,
    source=None,
)

Builds a metrics job from the parameters.

Warning

This is meant to be used in cases no handled by add_job_to_queue_for_token.

PARAMETER DESCRIPTION
product

The product key

TYPE: str

kind

The kind belonging to the project

TYPE: str

organization

The consumption will be added to organization/project

TYPE: KeyOrganizationModel | ProjectReference

counter

How many queries should be counted

TYPE: int DEFAULT: 1

Source code in application_kit/metrics/queue.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def build_job_data(
    product: Annotated[str, Doc("The product key")],
    kind: Annotated[str, Doc("The kind belonging to the project")],
    organization: Annotated[
        KeyOrganizationModel | ProjectReference, Doc("The consumption will be added to organization/project")
    ],
    counter: Annotated[int, Doc("How many queries should be counted")] = 1,
    timestamp: float | None = None,
    source: str | None = None,
) -> CounterJobDict:
    """
    Builds a metrics job from the parameters.
    !!! warning
        This is meant to be used in cases no handled by
        [add_job_to_queue_for_token][application_kit.metrics.queue.add_job_to_queue_for_token].
    """
    ts: float = timestamp or time()

    if all(arg is not None for arg in [product, kind, organization]):
        serialized_organization: CounterOrganizationDict

        if isinstance(organization, KeyOrganizationModel):
            serialized_organization = {"pk": organization.pk, "project_pk": organization.project_pk}
        else:
            if organization.organization_id is None or organization.project_id is None:
                raise PermissionDenied

            serialized_organization = {"pk": organization.organization_id, "project_pk": organization.project_id}

        data: CounterJobDict = {
            "product": product,
            "kind": kind,
            "ts": ts,
            "organization": serialized_organization,
            "counter": counter,
            "source": source,
        }

        return data
    else:
        raise RuntimeError("Invalid job.")

add_job_to_queue_for_token

add_job_to_queue_for_token(
    project_token, product, kind, count=1
)

Adds a metrics job to the queue, taking into account the project_token.context.should_count context. Pass source from project_token context

PARAMETER DESCRIPTION
project_token

The project token obtained from an authenticated request.

TYPE: ProjectTokenModel

Source code in application_kit/metrics/queue.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def add_job_to_queue_for_token(
    project_token: Annotated[ProjectTokenModel, Doc("The project token obtained from an authenticated request.")],
    product: str,
    kind: str,
    count: int = 1,
) -> None:
    """
    Adds a metrics job to the queue, taking into account the [project_token.context.should_count][application_kit.authenticator.types.ProjectTokenModel] context.
    Pass source from project_token context
    """
    if project_token and project_token.context.should_count:
        reference = project_token.reference
        if reference is None or reference.organization_id is None or reference.project_id is None:
            raise PermissionDenied

        source = project_token.context.source

        data = build_job_data(product, kind, reference, counter=count, source=source)
        queue = Queue(queue_configuration)
        queue.put(data)

application_kit.queue.cua

JobHandler

Bases: Protocol

__call__

__call__(job_id, data)

Job Handler call protocol

Source code in application_kit/queue/cua.py
131
132
def __call__(self, job_id: str, data: JobData | None) -> None:
    """Job Handler call protocol"""

WorkerBase

WorkerBase(queue, func)

Bases: Thread

Thread executing tasks from a given tasks queue

Source code in application_kit/queue/cua.py
138
139
140
141
142
143
def __init__(self, queue: Queue, func: JobHandler) -> None:
    super().__init__()
    self.queue = queue
    self.daemon = True
    self.should_exit = False
    self.func = func

before_job_hook

before_job_hook()

Ran before job is ran

Source code in application_kit/queue/cua.py
150
151
def before_job_hook(self) -> None:
    """Ran before job is ran"""

after_job_hook

after_job_hook()

Ran after job has been completed.

Source code in application_kit/queue/cua.py
153
154
def after_job_hook(self) -> None:
    """Ran after job has been completed."""

ThreadPool

ThreadPool(num_threads, func, config)

Pool of threads consuming tasks from a queue

Source code in application_kit/queue/cua.py
201
202
203
204
205
206
def __init__(self, num_threads: int, func: JobHandler, config: QueueConfig) -> None:
    self.queue = Queue(config)
    self.workers = [Worker(self.queue, func) for _ in range(num_threads)]

    for worker in self.workers:
        worker.start()