Skip to content

Metrics

application_kit.metrics.queue

Core implementation of the metrics queue client interaction.

# get the project token on Django-based applications
project_token = get_project_token(request)

# on FastAPI-based applications:
project_token = request.state.project_token
# or through a dependency like AuthenticateKey.

add_job_to_queue_for_token(project_token, "MAPS", "load")

# if needed we can specify the count

add_job_to_queue_for_token(project_token, "STORES", "import", count=435)

Tip

The request header X-SDK-Source is internally used by SDKs to populate the job source field.

The request header AI-Context (boolean value) is used to populate the job source field.

build_job_data

build_job_data(
    product,
    kind,
    organization,
    counter=1,
    timestamp=None,
    source=None,
)

Builds a metrics job from the parameters.

Warning

This is meant to be used in cases no handled by add_job_to_queue_for_token.

PARAMETER DESCRIPTION
product

The product key

TYPE: str

kind

The kind belonging to the project

TYPE: str

organization

The consumption will be added to organization/project

TYPE: KeyOrganizationModel | ProjectReference

counter

How many queries should be counted

TYPE: int DEFAULT: 1

Source code in application_kit/metrics/queue.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def build_job_data(
    product: Annotated[str, Doc("The product key")],
    kind: Annotated[str, Doc("The kind belonging to the project")],
    organization: Annotated[
        KeyOrganizationModel | ProjectReference, Doc("The consumption will be added to organization/project")
    ],
    counter: Annotated[int, Doc("How many queries should be counted")] = 1,
    timestamp: float | None = None,
    source: str | None = None,
) -> CounterJobDict:
    """
    Builds a metrics job from the parameters.
    !!! warning
        This is meant to be used in cases no handled by
        [add_job_to_queue_for_token][application_kit.metrics.queue.add_job_to_queue_for_token].
    """
    ts: float = timestamp or time()

    if all(arg is not None for arg in [product, kind, organization]):
        serialized_organization: CounterOrganizationDict

        if isinstance(organization, KeyOrganizationModel):
            serialized_organization = {"pk": organization.pk, "project_pk": organization.project_pk}
        else:
            if organization.organization_id is None or organization.project_id is None:
                raise PermissionDenied

            serialized_organization = {"pk": organization.organization_id, "project_pk": organization.project_id}

        data: CounterJobDict = {
            "product": product,
            "kind": kind,
            "ts": ts,
            "organization": serialized_organization,
            "counter": counter,
            "source": source,
        }

        return data
    else:
        raise RuntimeError("Invalid job.")

add_job_to_queue_for_token

add_job_to_queue_for_token(
    project_token, product, kind, count=1
)

Adds a metrics job to the queue, taking into account the project_token.context.should_count context. Pass source from project_token context

PARAMETER DESCRIPTION
project_token

The project token obtained from an authenticated request.

TYPE: ProjectTokenModel

Source code in application_kit/metrics/queue.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def add_job_to_queue_for_token(
    project_token: Annotated[ProjectTokenModel, Doc("The project token obtained from an authenticated request.")],
    product: str,
    kind: str,
    count: int = 1,
) -> None:
    """
    Adds a metrics job to the queue, taking into account the [project_token.context.should_count][application_kit.authenticator.types.ProjectTokenModel] context.
    Pass source from project_token context
    """
    if project_token and project_token.context.should_count:
        reference = project_token.reference
        if reference is None or reference.organization_id is None or reference.project_id is None:
            raise PermissionDenied

        source = project_token.context.source

        data = build_job_data(product, kind, reference, counter=count, source=source)
        queue = Queue(queue_configuration)
        queue.put(data)

application_kit.queue.cua

JobHandler

Bases: Protocol

__call__

__call__(job_id, data)

Job Handler call protocol

Source code in application_kit/queue/cua.py
131
132
def __call__(self, job_id: str, data: JobData | None) -> None:
    """Job Handler call protocol"""

WorkerBase

WorkerBase(queue, func)

Bases: Thread

Thread executing tasks from a given tasks queue

Source code in application_kit/queue/cua.py
138
139
140
141
142
143
def __init__(self, queue: Queue, func: JobHandler) -> None:
    super().__init__()
    self.queue = queue
    self.daemon = True
    self.should_exit = False
    self.func = func

before_job_hook

before_job_hook()

Ran before job is ran

Source code in application_kit/queue/cua.py
150
151
def before_job_hook(self) -> None:
    """Ran before job is ran"""

after_job_hook

after_job_hook()

Ran after job has been completed.

Source code in application_kit/queue/cua.py
153
154
def after_job_hook(self) -> None:
    """Ran after job has been completed."""

ThreadPool

ThreadPool(num_threads, func, config)

Pool of threads consuming tasks from a queue

Source code in application_kit/queue/cua.py
201
202
203
204
205
206
def __init__(self, num_threads: int, func: JobHandler, config: QueueConfig) -> None:
    self.queue = Queue(config)
    self.workers = [Worker(self.queue, func) for _ in range(num_threads)]

    for worker in self.workers:
        worker.start()