Skip to content

Authentication

Application Kit provides authentication for validating API keys and user tokens across all supported frameworks.

Overview

Authentication validates incoming requests and attaches a project_token to the request, containing:

  • Token type (public key, private key, or user token)
  • Organization and project IDs
  • Permissions and allowed domains

Usage

Use the AuthenticateKey dependency:

"""FastAPI authentication example."""

from fastapi import APIRouter, Depends, Request

from application_kit.authenticator.types import Products
from application_kit.fastapi.fastapi import get_fastapi_app
from application_kit.fastapi.security import AuthenticateKey

app = get_fastapi_app("api")
router = APIRouter()
app.include_router(router)


@router.get(
    "/endpoint",
    dependencies=[Depends(AuthenticateKey(product=Products.STORES))],
)
async def endpoint(request: Request) -> dict[str, int]:
    project_token = request.state.project_token
    return {
        "organization_id": project_token.reference.organization_id,
        "project_id": project_token.reference.project_id,
    }

Use the @authenticate_key() or @authenticate_user() decorators:

"""Django authentication example."""

from django.http import HttpRequest, JsonResponse

from application_kit.django.decorators import authenticate_key, authenticate_user
from application_kit.django.request import get_project_token


@authenticate_key()
def public_endpoint(request: HttpRequest) -> JsonResponse:
    # Access the project token via helper function
    project_token = get_project_token(request)
    if project_token.reference:
        return JsonResponse({"org_id": project_token.reference.organization_id})
    return JsonResponse({"org_id": None})


@authenticate_user(classes=[])
def user_endpoint(request: HttpRequest) -> JsonResponse:
    return JsonResponse({"status": "ok"})


@authenticate_key()
async def async_endpoint(request: HttpRequest) -> JsonResponse:
    return JsonResponse({"status": "ok"})

Not recommended for new projects

Use FastAPI for new projects.

Use Shinobi authentication schemes with the @chain() and @terminate() decorators:

"""Django Ninja authentication example."""

from django.http import HttpRequest
from ninja import Router, Schema

from application_kit.django.decorators import authenticate_key
from application_kit.shinobi.api import WoosmapApi
from application_kit.shinobi.authentication import (
    PrivateKeyAuth,
    PrivateKeyHeaderAuth,
    PublicKeyAuth,
)
from application_kit.shinobi.decorators import chain, terminate

api = WoosmapApi(description="My API")
router = Router(tags=["api"])
api.add_router("", router)


class SearchResponse(Schema):
    results: list[str]


@router.get(
    "/search",
    response={200: SearchResponse},
    auth=[PublicKeyAuth(), PrivateKeyAuth(), PrivateKeyHeaderAuth()],
)
@chain(authenticate_key())
@terminate()
def search(request: HttpRequest) -> SearchResponse:
    return SearchResponse(results=["result 1", "result 2"])


@router.get("/search-async", response={200: SearchResponse}, auth=[PublicKeyAuth()])
@chain(authenticate_key())
@terminate()
async def search_async(request: HttpRequest) -> SearchResponse:
    return SearchResponse(results=["result 1", "result 2"])

Permission Classes

Permission classes validate that the authenticated token has the required access level.

Token Type Permissions

Permission Token Type Description
IsUserPermission user token Requires authentication via user credentials (JWT)
IsPublicKeyPermission public key Requires a public API key
IsPrivateKeyPermission private key Requires a private API key

User Permissions

Permission Description
IsStaff User must have staff or superuser status
IsSuperuser User must have superuser status
IsMemberOfCurrentOrganization(lambda) User must be a member of the target organization. Pass None to use organization_id from request path
IsOwnerOfCurrentOrganization(lambda) User must be an owner of the target organization
IsPartner User must be a partner managing the organization

Public Key Permissions

Permission Description
IsOriginAllowed Validates the request origin against the key's allowed domains

Private Key Permissions

Permission Description
HasWritePermissionsOnStores Key must have write permission on stores
HasReadPermissionsOnStores Key must have read permission on stores

Using Permission Classes

"""FastAPI permission classes example."""

from fastapi import APIRouter, Depends

from application_kit.authenticator.types import Products
from application_kit.fastapi.security import AuthenticateKey

router = APIRouter()


# Require write permission
@router.get(
    "/write-endpoint",
    dependencies=[
        Depends(
            AuthenticateKey(
                product=Products.STORES,
                write_permission_needed=True,
            )
        )
    ],
)
async def write_endpoint() -> dict[str, str]:
    return {"status": "ok"}
"""Django permission classes example."""

from django.http import HttpRequest, JsonResponse

from application_kit.authenticator.permissions import (
    HasReadPermissionsOnStores,
    IsStaff,
    Or,
)
from application_kit.django.decorators import authenticate_user


@authenticate_user(classes=[IsStaff])
def staff_only_view(request: HttpRequest) -> JsonResponse:
    return JsonResponse({"status": "ok"})


@authenticate_user(classes=[Or(IsStaff, HasReadPermissionsOnStores)])
def staff_or_read_permission_view(request: HttpRequest) -> JsonResponse:
    # Allows either staff users OR private keys with read permission
    return JsonResponse({"status": "ok"})
"""Django Ninja permission classes example."""

from django.http import HttpRequest
from ninja import Router

from application_kit.django.decorators import authenticate_key
from application_kit.shinobi.authentication import PrivateKeyAuth
from application_kit.shinobi.decorators import chain, terminate

router = Router()


@router.get("/private", auth=[PrivateKeyAuth()])
@chain(authenticate_key())
@terminate()
def private_endpoint(request: HttpRequest) -> dict[str, str]:
    return {"status": "ok"}

Authentication Schemes (Django Ninja)

Shinobi provides authentication scheme classes for Django Ninja's auth parameter:

Scheme Description
PublicKeyAuth Authenticates via public API key in query string
PrivateKeyAuth Authenticates via private API key in query string
PrivateKeyHeaderAuth Authenticates via private API key in X-Api-Key header

Configuration

Authentication requires the following dependencies in your application.json:

{
  "dependencies": {
    "services": [
      {"name": "authentication", "test_value": "http://localhost:34555"}
    ],
    "databases": [
      {"name": "authentication_cache", "type": "redis"}
    ]
  }
}

API Reference

application_kit.authenticator.permissions

IsUserPermission

Bases: BasePermission

Require authentication via user bearer token (JWT).

IsPublicKeyPermission

Bases: BasePermission

Require authentication via public API key (query param key).

IsPrivateKeyPermission

Bases: BasePermission

Require authentication via private API key (query param or X-Api-Key header).

IsStaff

Bases: BasePermission

Require user to have staff or superuser status.

IsSuperuser

Bases: BasePermission

Require user to have superuser status.

IsPartner

Bases: BasePermission

Require user to be a partner managing the organization.

Partners are users who manage organizations on behalf of clients. This permission checks if the user has management rights for the target organization or manages any organizations at all.

IsOriginAllowed

Bases: BasePermission

Validate request origin against the public key's allowed domains.

HasWritePermissionsOnStores

Bases: PrivateKeyPermission

HasReadPermissionsOnStores

Bases: PrivateKeyPermission

Or

Or(*permissions_classes)

Combine multiple permission classes with OR logic.

Returns a permission class that passes if ANY of the provided permission classes pass. Useful for endpoints that accept multiple authentication types.

Example
authenticate_staff_or_member = authenticate_user(
    classes=[Or(IsStaff, IsMemberOfCurrentOrganization(None), IsPartner)]
)
PARAMETER DESCRIPTION
*permissions_classes

Variable number of permission class types to combine.

TYPE: type[CheckablePermission] DEFAULT: ()

RETURNS DESCRIPTION
type[CheckablePermission]

A permission class that passes if any provided class passes.

Source code in application_kit/authenticator/permissions.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def Or(*permissions_classes: type[CheckablePermission]) -> type[CheckablePermission]:  # NOSONAR
    """Combine multiple permission classes with OR logic.

    Returns a permission class that passes if ANY of the provided permission
    classes pass. Useful for endpoints that accept multiple authentication types.

    Example:
        ```python
        authenticate_staff_or_member = authenticate_user(
            classes=[Or(IsStaff, IsMemberOfCurrentOrganization(None), IsPartner)]
        )
        ```

    Args:
        *permissions_classes: Variable number of permission class types to combine.

    Returns:
        A permission class that passes if any provided class passes.
    """

    class _Or(CheckablePermission):
        @classmethod
        def check(
            cls,
            project_token: ProjectTokenModel,
            headers: Mapping[str, str],
        ) -> dict[str, str]:
            exception = None
            extra_kwargs = {}

            for permission_class in permissions_classes:
                try:
                    response_headers = permission_class.check(project_token, headers)
                    extra_kwargs.update(response_headers)
                except HttpException as e:
                    extra_kwargs = {}
                    exception = e
                else:
                    exception = None
                    break

            if exception is not None:
                raise exception

            return extra_kwargs

    return _Or

IsMemberOfCurrentOrganization

IsMemberOfCurrentOrganization(org_lambda)

Require user to be a member of the target organization.

PARAMETER DESCRIPTION
org_lambda

A callable that extracts organization_id from ProjectReference, or None to use project_token.reference.organization_id directly.

TYPE: Any

Example
# Use organization_id from request path parameters
@authenticate_user(classes=[IsMemberOfCurrentOrganization(None)])
def my_view(request, organization_id: int):
    ...

# Use custom lambda to extract org from reference
@authenticate_user(classes=[IsMemberOfCurrentOrganization(lambda ref: ref.custom_org_id)])
def my_view(request):
    ...
Source code in application_kit/authenticator/permissions.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def IsMemberOfCurrentOrganization(org_lambda: Any) -> type[CheckablePermission]:  # NOSONAR
    """Require user to be a member of the target organization.

    Args:
        org_lambda: A callable that extracts organization_id from ProjectReference,
            or `None` to use `project_token.reference.organization_id` directly.

    Example:
        ```python
        # Use organization_id from request path parameters
        @authenticate_user(classes=[IsMemberOfCurrentOrganization(None)])
        def my_view(request, organization_id: int):
            ...

        # Use custom lambda to extract org from reference
        @authenticate_user(classes=[IsMemberOfCurrentOrganization(lambda ref: ref.custom_org_id)])
        def my_view(request):
            ...
        ```
    """

    class _IsMemberOfCurrentOrganization(BasePermission):
        @classmethod
        def check(cls, project_token: ProjectTokenModel, headers: Mapping[str, str]) -> dict[str, str]:
            project_ref = project_token.reference
            token = project_token.instance
            if token.kind == USER_TOKEN and project_ref is not None:
                pk = project_ref.organization_id or org_lambda(project_ref)
                token.check_membership(pk)
                return {}

            raise PermissionDenied

    return _IsMemberOfCurrentOrganization

IsOwnerOfCurrentOrganization

IsOwnerOfCurrentOrganization(org_lambda)

Require user to be an owner of the target organization.

Owners have full administrative rights over the organization.

PARAMETER DESCRIPTION
org_lambda

Currently unused, but kept for API consistency with IsMemberOfCurrentOrganization. Pass None.

TYPE: Any

Source code in application_kit/authenticator/permissions.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def IsOwnerOfCurrentOrganization(org_lambda: Any) -> type[CheckablePermission]:  # NOSONAR
    """Require user to be an owner of the target organization.

    Owners have full administrative rights over the organization.

    Args:
        org_lambda: Currently unused, but kept for API consistency with
            `IsMemberOfCurrentOrganization`. Pass `None`.
    """

    class _IsOwnerOfCurrentOrganization(BasePermission):
        @classmethod
        def check(cls, project_token: ProjectTokenModel, headers: Mapping[str, str]) -> dict[str, str]:
            if (
                project_token.reference is not None
                and project_token.instance.kind == USER_TOKEN
                and project_token.reference.organization_id is not None
            ):
                pk = project_token.reference.organization_id
                project_token.instance.check_ownership(pk)
                return {}
            raise PermissionDenied()

    return _IsOwnerOfCurrentOrganization

FastAPI Security Dependencies

application_kit.fastapi.security

AuthenticateKey

AuthenticateKey(product=None, write_permission_needed=None)

Bases: FastAPIAuthenticateBase

Authenticate requests using public or private API keys.

Accepts authentication via: - Query parameter key (public key) - Query parameter private_key (private key) - Header X-Api-Key (private key)

After authentication, request.state.project_token contains the validated token.

PARAMETER DESCRIPTION
product

Required product for the endpoint (e.g., Products.STORES).

TYPE: Products | None DEFAULT: None

write_permission_needed

If True, requires private key with write permission.

TYPE: bool | None DEFAULT: None

Example
@router.get(
    "/endpoint",
    dependencies=[Depends(AuthenticateKey(product=Products.STORES))],
)
async def endpoint(request: Request):
    project_token = request.state.project_token
    return {"org_id": project_token.reference.organization_id}
Source code in application_kit/fastapi/security.py
118
119
def __init__(self, product: Products | None = None, write_permission_needed: bool | None = None) -> None:
    super().__init__(None, None, product, write_permission_needed)

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers

AuthenticateWritePrivateKey

AuthenticateWritePrivateKey(product=None)

Bases: FastAPIAuthenticateBase

Authenticate requests using private API key with write permission.

Use this for endpoints that modify data. Only accepts private keys that have write permission enabled.

Accepts authentication via: - Query parameter private_key - Header X-Api-Key

PARAMETER DESCRIPTION
product

Required product for the endpoint (e.g., Products.DATASETS).

TYPE: Products | None DEFAULT: None

Example
@router.delete(
    "/resource/{id}",
    dependencies=[Depends(AuthenticateWritePrivateKey(product=Products.STORES))],
)
async def delete_resource(id: int):
    ...
Source code in application_kit/fastapi/security.py
162
163
def __init__(self, product: Products | None = None) -> None:
    super().__init__(None, None, product, endpoint_mode=EndpointMode.WRITE_ONLY)

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers

AuthenticateUser

AuthenticateUser(classes=None, product=None)

Bases: FastAPIAuthenticateBase

Authenticate requests using user JWT bearer token.

Accepts authentication via the Authorization: Bearer <token> header. Use with permission classes to enforce organization membership or role requirements.

PARAMETER DESCRIPTION
classes

List of permission classes to enforce (e.g., [IsStaff]).

TYPE: list[type[CheckablePermission]] | None DEFAULT: None

product

Required product for the endpoint.

TYPE: Products | None DEFAULT: None

Example
from application_kit.authenticator.permissions import IsMemberOfCurrentOrganization, Or, IsStaff

@router.get(
    "/org/{organization_id}/data",
    dependencies=[
        Depends(AuthenticateUser(classes=[Or(IsStaff, IsMemberOfCurrentOrganization(None))]))
    ],
)
async def get_org_data(organization_id: int):
    ...
Source code in application_kit/fastapi/security.py
206
207
def __init__(self, classes: list[type[CheckablePermission]] | None = None, product: Products | None = None) -> None:
    super().__init__(classes, None, product, None)

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers