Skip to content

Authentication

application_kit.authenticator.base

Framework-agnostic authentication helpers.

FastAPI security Django decorators

EndpointMode

Bases: str, Enum

EndpointMode controls how an endpoint should require public or private key based on the request verb.

AuthenticateBase

AuthenticateBase(
    classes=None,
    token_lambda=None,
    product=None,
    write_permission_needed=None,
    endpoint_mode=UNSET,
)

Base class that encompasses token fetching, and perform instance checks

PARAMETER DESCRIPTION
product

The products required to be activated on the project.

TYPE: Products | None DEFAULT: None

write_permission_needed

Deprecated use endpoint_mode instead.

TYPE: bool | None DEFAULT: None

endpoint_mode

Controls what kind of permissions are required from the ProjectToken.

TYPE: EndpointMode DEFAULT: UNSET

Source code in application_kit/authenticator/base.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    classes: list[type[CheckablePermission]] | None = None,
    token_lambda: TokenLambda | None = None,
    product: Annotated[Products | None, Doc("The products required to be activated on the project.")] = None,
    write_permission_needed: Annotated[bool | None, Doc("**Deprecated** use endpoint_mode instead.")] = None,
    endpoint_mode: Annotated[
        EndpointMode, Doc("Controls what kind of permissions are required from the ProjectToken.")
    ] = EndpointMode.UNSET,
) -> None:
    self.classes = classes
    self.token_lambda = token_lambda
    if endpoint_mode == EndpointMode.UNSET:
        match write_permission_needed:
            case None:
                self.endpoint_mode = EndpointMode.READ_WRITE
            case True:
                self.endpoint_mode = EndpointMode.WRITE_ONLY
            case False:
                self.endpoint_mode = EndpointMode.READ_ONLY
    else:
        self.endpoint_mode = endpoint_mode

    if self.classes is None or len(self.classes) == 0:
        if self.endpoint_mode == EndpointMode.WRITE_ONLY:
            self.token_lambda = partial(get_authorization_token, [PRIVATE_KEY])

    self.product = product
    self.write_permission_needed = write_permission_needed

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers

get_public_key

get_public_key(parameters, headers)

Gets a public key from request parameters.

Source code in application_kit/authenticator/base.py
207
208
209
210
211
212
213
def get_public_key(parameters: dict[str, str], headers: Mapping[str, str]) -> str | None:
    """
    Gets a public key from request parameters.
    """
    public_key = parameters.get("key")

    return public_key

get_private_key

get_private_key(parameters, headers)

Gets a private key from either the request parameters or headers.

Source code in application_kit/authenticator/base.py
216
217
218
219
220
221
222
223
def get_private_key(parameters: dict[str, str], headers: Mapping[str, str]) -> str | None:
    """Gets a private key from either the request parameters or headers."""
    private_key = parameters.get("private_key")

    if private_key is None:
        private_key = headers.get("X-Api-Key")

    return private_key

get_user_access_token

get_user_access_token(parameters, headers)

Gets a user access token using the Authorization request header.

Source code in application_kit/authenticator/base.py
226
227
228
229
230
231
232
233
234
235
236
237
def get_user_access_token(parameters: dict[str, str], headers: Mapping[str, str]) -> str | None:
    """Gets a user access token using the Authorization request header."""
    authorization_header = headers.get("Authorization")
    access_token = None

    if authorization_header:
        authorization_components = authorization_header.strip().split(" ")

        if len(authorization_components) == 2 and authorization_components[0].lower() == "bearer":
            access_token = authorization_components[1]

    return access_token

application_kit.authenticator.types

RestrictionModel

Bases: ABC, BaseModel

is_fulfilled abstractmethod

is_fulfilled(headers)

is_fulfilled protocol method.

Source code in application_kit/authenticator/types.py
107
108
109
@abc.abstractmethod
def is_fulfilled(self, headers: Mapping[str, str]) -> tuple[bool, dict[str, str]]:
    """is_fulfilled protocol method."""

KeyTokenInstanceModel

Bases: ABC, BaseModel

Base class for Key token instances.

check_restriction abstractmethod

check_restriction(headers)

Checks that at least one restriction is fulfilled.

Source code in application_kit/authenticator/types.py
222
223
224
@abc.abstractmethod
def check_restriction(self, headers: Mapping[str, str]) -> dict[str, str]:
    """Checks that at least one restriction is fulfilled."""

check_permissions abstractmethod

check_permissions(product, permission_names)

Checks that the instance has the permission for the product.

Source code in application_kit/authenticator/types.py
226
227
228
@abc.abstractmethod
def check_permissions(self, product: str, permission_names: list[str]) -> None:
    """Checks that the instance has the permission for the product."""

check_write_needed

check_write_needed(method)

Checks if the method requires write permission.

Note

The following methods require write permission: POST, PUT, DELETE and PATCH

PARAMETER DESCRIPTION
method

The http method used by the request.

TYPE: str

Source code in application_kit/authenticator/types.py
230
231
232
233
234
235
236
237
238
239
240
def check_write_needed(self, method: Annotated[str, Doc("The http method used by the request.")]) -> None:
    """Checks if the method requires write permission.
    !!! note
        The following methods require write permission:
        `POST`, `PUT`, `DELETE` and `PATCH`
    """

    write_methods = ["POST", "PUT", "DELETE", "PATCH"]

    if method in write_methods:
        self.check_permissions("stores", ["write"])

PrivateKeyTokenInstanceModel

Bases: KeyTokenInstanceModel

Token instance for a private key.

Note

Retrived from authentication service /access_tokens/ endpoint.

check_write_needed

check_write_needed(method)

Checks if the method requires write permission.

Note

The following methods require write permission: POST, PUT, DELETE and PATCH

PARAMETER DESCRIPTION
method

The http method used by the request.

TYPE: str

Source code in application_kit/authenticator/types.py
230
231
232
233
234
235
236
237
238
239
240
def check_write_needed(self, method: Annotated[str, Doc("The http method used by the request.")]) -> None:
    """Checks if the method requires write permission.
    !!! note
        The following methods require write permission:
        `POST`, `PUT`, `DELETE` and `PATCH`
    """

    write_methods = ["POST", "PUT", "DELETE", "PATCH"]

    if method in write_methods:
        self.check_permissions("stores", ["write"])

check_permissions

check_permissions(product, permission_names)

Checks that the instance has the permission for the product.

Warning

This is now defunkt the only permission remaning is write for the stores product.

This is used by every products, and translates to simply "has write permission".

Source code in application_kit/authenticator/types.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def check_permissions(self, product: str, permission_names: list[str]) -> None:
    """Checks that the instance has the permission for the product.

    !!! warning
        This is now defunkt the only permission remaning is `write` for the `stores` product.

        This is used by every products, and translates to simply "has write permission".
    """
    permissions = self.permissions.get(product, [])
    for name in permission_names:
        if permissions and name in permissions:
            return
    logger.debug(f"Token has not the permission {self}. Requested permissions : {permission_names} on {product}")
    raise PermissionDenied

PublicKeyTokenInstanceModel

Bases: KeyTokenInstanceModel

Token instance for a public key.

Note

Retrived from authentication service /access_tokens/ endpoint.

check_write_needed

check_write_needed(method)

Checks if the method requires write permission.

Note

The following methods require write permission: POST, PUT, DELETE and PATCH

PARAMETER DESCRIPTION
method

The http method used by the request.

TYPE: str

Source code in application_kit/authenticator/types.py
230
231
232
233
234
235
236
237
238
239
240
def check_write_needed(self, method: Annotated[str, Doc("The http method used by the request.")]) -> None:
    """Checks if the method requires write permission.
    !!! note
        The following methods require write permission:
        `POST`, `PUT`, `DELETE` and `PATCH`
    """

    write_methods = ["POST", "PUT", "DELETE", "PATCH"]

    if method in write_methods:
        self.check_permissions("stores", ["write"])

UserTokenInstanceModel

Bases: BaseModel

Token instance for a user.

Note

Retrived from the Authorization: bearer .

check_staff_or_superuser

check_staff_or_superuser()

Checks if user is staff or super user. :raise PermissionDenied :return:

Source code in application_kit/authenticator/types.py
358
359
360
361
362
363
364
365
def check_staff_or_superuser(self) -> None:
    """
    Checks if user is staff or super user.
    :raise PermissionDenied
    :return:
    """
    if not self.is_superuser and not self.is_staff:
        raise PermissionDenied

check_superuser

check_superuser()

Checks if user is super user. :raise PermissionDenied

Source code in application_kit/authenticator/types.py
367
368
369
370
371
372
373
def check_superuser(self) -> None:
    """
    Checks if user is super user.
    :raise PermissionDenied
    """
    if not self.is_superuser:
        raise PermissionDenied

check_membership

check_membership(organization_id)

Checks if user is member of the organization with organization_id. :raise PermissionDenied

Source code in application_kit/authenticator/types.py
375
376
377
378
379
380
381
382
def check_membership(self, organization_id: int) -> None:
    """
    Checks if user is member of the organization with `organization_id`.
    :raise PermissionDenied
    """
    if self.organization is not None and self.organization.pk == organization_id:
        return
    raise PermissionDenied

check_ownership

check_ownership(organization_id)

Checks that user has ownership of the organization.

Source code in application_kit/authenticator/types.py
389
390
391
392
393
def check_ownership(self, organization_id: int) -> None:
    """Checks that user has ownership of the organization."""
    if self.organization is not None and self.organization.pk == organization_id and self.organization.is_owner:
        return
    raise PermissionDenied

check_origin

check_origin(headers)

Checks if the Origin is allowed for User tokens browser interactions.

Source code in application_kit/authenticator/types.py
395
396
397
398
399
400
401
402
403
404
405
406
def check_origin(self, headers: Mapping[str, str]) -> dict[str, str]:
    """Checks if the Origin is allowed for User tokens browser interactions."""
    from application_kit.settings import get_cors_origin_whitelist

    origin = headers.get(HTTP_ORIGIN)

    if origin is not None:
        parsed_origin = urlparse(origin)
        if parsed_origin.hostname in get_cors_origin_whitelist():
            return {"Access-Control-Allow-Origin": origin}

    return {}

GenericCallable

Bases: Protocol

__call__

__call__(*args, **kwargs)

Generic callable takes Any as parameters and returns Any.

Source code in application_kit/authenticator/types.py
428
429
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Generic callable takes Any as parameters and returns Any."""

ProjectReference

Bases: BaseModel

Project reference. Encapsulates the organization ID and the project ID coming from authentication.

Tip

Used to identify the targeted project by the request. For Private or Public keys tokens it's direct, but for User tokens it's a bit more involved.

User can target multiple projects or different organizations that's why the endpoints accepting User tokens must have organization_id and project_id path arguments.

organization_id instance-attribute

organization_id

The organization id in the authentication service.

project_id class-attribute instance-attribute

project_id = None

The project id in the authentication service.

trace_to_datadog

trace_to_datadog()

Add project_id, and organization_id tags to the current root span.

Source code in application_kit/authenticator/types.py
447
448
449
450
451
452
453
454
def trace_to_datadog(self) -> None:
    """Add project_id, and organization_id tags to the current root span."""
    span = ddtrace.trace.tracer.current_root_span()
    if span:
        if self.organization_id:
            span.set_tag("organization_id", str(self.organization_id))
        if self.project_id:
            span.set_tag("project_id", str(self.project_id))

ProjectTokenContext

Context object that avoids to put too much things on the request objects.

should_count class-attribute instance-attribute

should_count = True

If the current request should be counted. Internal requests (eg. request Host port is 8000) are excluded from metrics counters.

source class-attribute instance-attribute

source = None

The source pulled from either X-SDK-Source header or if ai-context if AI-Context header is set to true

ProjectTokenModel

ProjectTokenModel(token_instance, reference=None)

This is the main objects to interact with when using authentication.

PARAMETER DESCRIPTION
token_instance

The token instance retrived from authentication service using private key

TYPE: AllTokensModel

reference

The project reference ids.

TYPE: ProjectReference | None DEFAULT: None

Source code in application_kit/authenticator/types.py
483
484
485
486
487
488
489
490
491
492
def __init__(
    self,
    token_instance: Annotated[
        AllTokensModel, Doc("The token instance retrived from authentication service using private key")
    ],
    reference: Annotated[ProjectReference | None, Doc("The project reference ids.")] = None,
) -> None:
    self.instance = token_instance
    self.reference = reference
    self.context = ProjectTokenContext()

instance instance-attribute

instance = token_instance

The token instance.

reference instance-attribute

reference = reference

The project reference.

context instance-attribute

context = ProjectTokenContext()

The current context, contains source and should_count.

products property

products

Returns the products found in the token.

Warn

User tokens have no products.

check_product_allowed

check_product_allowed(product)

Raises ProductNotAllowed

Source code in application_kit/authenticator/types.py
505
506
507
508
509
510
def check_product_allowed(self, product: Products | None) -> None:
    """
    Raises ProductNotAllowed
    """
    if product is not None and self.products is not None and product not in self.products:
        raise ProductNotAllowed(product.value)

validate_products

validate_products(value)

Validates a product list, ignores unknown products

Source code in application_kit/authenticator/types.py
243
244
245
246
247
248
249
250
251
def validate_products(value: list[str]) -> list[Products]:
    """Validates a product list, ignores unknown products"""
    products = []
    for product in value:
        try:
            products.append(Products[product])
        except KeyError as e:
            logger.debug(f"Unsupported product {e}")
    return products