Skip to content

Security

application_kit.fastapi.security

FastAPIAuthenticateBase

FastAPIAuthenticateBase(
    classes=None,
    token_lambda=None,
    product=None,
    write_permission_needed=None,
    endpoint_mode=UNSET,
)

Bases: AuthenticateBase

Base class to implement Authentication dependencies.

PARAMETER DESCRIPTION
product

The products required to be activated on the project.

TYPE: Products | None DEFAULT: None

write_permission_needed

Deprecated use endpoint_mode instead.

TYPE: bool | None DEFAULT: None

endpoint_mode

Controls what kind of permissions are required from the ProjectToken.

TYPE: EndpointMode DEFAULT: UNSET

Source code in application_kit/authenticator/base.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    classes: list[type[CheckablePermission]] | None = None,
    token_lambda: TokenLambda | None = None,
    product: Annotated[Products | None, Doc("The products required to be activated on the project.")] = None,
    write_permission_needed: Annotated[bool | None, Doc("**Deprecated** use endpoint_mode instead.")] = None,
    endpoint_mode: Annotated[
        EndpointMode, Doc("Controls what kind of permissions are required from the ProjectToken.")
    ] = EndpointMode.UNSET,
) -> None:
    self.classes = classes
    self.token_lambda = token_lambda
    if endpoint_mode == EndpointMode.UNSET:
        match write_permission_needed:
            case None:
                self.endpoint_mode = EndpointMode.READ_WRITE
            case True:
                self.endpoint_mode = EndpointMode.WRITE_ONLY
            case False:
                self.endpoint_mode = EndpointMode.READ_ONLY
    else:
        self.endpoint_mode = endpoint_mode

    if self.classes is None or len(self.classes) == 0:
        if self.endpoint_mode == EndpointMode.WRITE_ONLY:
            self.token_lambda = partial(get_authorization_token, [PRIVATE_KEY])

    self.product = product
    self.write_permission_needed = write_permission_needed

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers

AuthenticateKey

AuthenticateKey(product=None, write_permission_needed=None)

Bases: FastAPIAuthenticateBase

AuthenticateKey authenticate a request using key, private_key, X-Api-Key.

Source code in application_kit/fastapi/security.py
95
96
def __init__(self, product: Products | None = None, write_permission_needed: bool | None = None) -> None:
    super().__init__(None, None, product, write_permission_needed)

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers

AuthenticateWritePrivateKey

AuthenticateWritePrivateKey(product=None)

Bases: FastAPIAuthenticateBase

AuthenticatePrivateKey authenticate a request using private_key, X-Api-Key requires write_permission.

Source code in application_kit/fastapi/security.py
118
119
def __init__(self, product: Products | None = None) -> None:
    super().__init__(None, None, product, endpoint_mode=EndpointMode.WRITE_ONLY)

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers

AuthenticateUser

AuthenticateUser(classes=None, product=None)

Bases: FastAPIAuthenticateBase

AuthenticateUser authenticate a request using User bearer token found in Authorization header.

Source code in application_kit/fastapi/security.py
140
141
def __init__(self, classes: list[type[CheckablePermission]] | None = None, product: Products | None = None) -> None:
    super().__init__(classes, None, product, None)

get_project_token

get_project_token(
    current_product,
    headers,
    key_token,
    kind,
    request_method,
    port,
    organization_id=None,
    project_id=None,
)

Calls authentication service to get a token for an incoming request.

PARAMETER DESCRIPTION
headers

The headers mapping from the request

TYPE: Mapping[str, str]

kind

The kind of token detected.

TYPE: PublicKeyType | PrivateKeyType | UserTokenType

organization_id

organization_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

project_id

project_id coming from the request path, used with user tokens.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
tuple[ProjectTokenModel, TokenModel]

Returns ProjectTokenModel and a TokenModel, the later will be retired.

Source code in application_kit/authenticator/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_project_token(
    self,
    current_product: Products | None,
    headers: Annotated[Mapping[str, str], Doc("The headers mapping from the request")],
    key_token: str,
    kind: Annotated[PublicKeyType | PrivateKeyType | UserTokenType, Doc("The kind of token detected.")],
    request_method: str,
    port: int | None,
    organization_id: Annotated[
        int | None, Doc("organization_id coming from the request path, used with user tokens.")
    ] = None,
    project_id: Annotated[
        int | None, Doc("project_id coming from the request path, used with user tokens.")
    ] = None,
) -> Annotated[
    tuple[ProjectTokenModel, TokenModel],
    Doc("Returns ProjectTokenModel and a TokenModel, the later will be retired."),
]:
    """Calls authentication service to get a token for an incoming request."""
    token = authenticate_request(key_token, kind)

    readable_token = decode_jwt(token)

    project_reference: ProjectReference

    instance = readable_token.instance
    _, project_reference = _get_products_and_project_reference(instance, organization_id, project_id)
    project_token = ProjectTokenModel(instance)
    project_token.reference = project_reference
    project_token.context.should_count = port != 8000

    aicontext = headers.get("ai-context", "false").lower() == "true"
    project_token.context.source = headers.get("x-sdk-source") if aicontext is False else "ai-context"

    project_token.check_product_allowed(current_product)

    project_reference.trace_to_datadog()

    assert request_method is not None

    return project_token, readable_token

perform_instance_checks

perform_instance_checks(
    project_token, headers, request_method
)

Performs permissions, and restrictions checks, returns extra response headers.

Source code in application_kit/authenticator/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def perform_instance_checks(
    self,
    project_token: ProjectTokenModel,
    headers: Mapping[str, str],
    request_method: str,
) -> dict[str, str]:
    """Performs permissions, and restrictions checks, returns extra response headers."""

    instance = project_token.instance

    if instance.kind != USER_TOKEN:
        if not self.classes or len(self.classes) == 0:
            match self.endpoint_mode:
                case EndpointMode.READ_WRITE:
                    instance.check_write_needed(request_method)
                case EndpointMode.WRITE_ONLY:
                    instance.check_permissions("stores", ["write"])
                case EndpointMode.READ_ONLY:
                    if request_method not in ["GET", "POST"]:
                        raise PermissionDenied()

            permissions_extra_headers = instance.check_restriction(headers)
        else:
            warnings.warn(
                "@permission_classes decorator is deprecated for public or private key authentication. see @authenticate_key decorator instead.",
                DeprecationWarning,
                stacklevel=4,
            )
            permissions_extra_headers = authorize_request(project_token, headers, self.classes)
    else:
        permissions_extra_headers = authorize_request(project_token, headers, self.classes or [])
        if instance.kind == USER_TOKEN:
            permissions_extra_headers |= instance.check_origin(headers)
    return permissions_extra_headers

AuthenticationErrorResponse

Bases: BaseModel

AuthenticationErrorResponse is the base authentication error model.