Skip to content

Commit

Permalink
Merge pull request #25 from cleanenergyexchange/dev
Browse files Browse the repository at this point in the history
chore: unify test env vars handling
  • Loading branch information
davidhuser authored Feb 18, 2025
2 parents 4723aaf + 5aeecc7 commit d24ef4d
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 96 deletions.
48 changes: 27 additions & 21 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,36 @@
from demo_project.dependencies import zitadel_auth
from demo_project.main import app
from fastapi_zitadel_auth import ZitadelAuth
from tests.utils import create_openid_keys, zitadel_issuer, openid_config_url, keys_url
from tests.utils import (
create_openid_keys,
ZITADEL_ISSUER,
openid_config_url,
keys_url,
ZITADEL_CLIENT_ID,
ZITADEL_PROJECT_ID,
)


@pytest.fixture
def fastapi_app():
"""FastAPI app fixture"""
zitadel_auth_overrides = ZitadelAuth(
issuer_url=zitadel_issuer(),
app_client_id="123456789",
project_id="987654321",
issuer_url=ZITADEL_ISSUER,
app_client_id=ZITADEL_CLIENT_ID,
project_id=ZITADEL_PROJECT_ID,
allowed_scopes={"scope1": "Some scope"},
)
app.dependency_overrides[zitadel_auth] = zitadel_auth_overrides
yield


@pytest.fixture(autouse=True)
def blockbuster() -> Iterator[BlockBuster]:
"""Detect blocking calls within an asynchronous event loop"""
with blockbuster_ctx() as bb:
yield bb


@pytest.fixture(autouse=True)
async def reset_openid_config():
"""Reset the OpenID configuration before each test"""
Expand All @@ -36,26 +50,18 @@ async def reset_openid_config():
yield


@pytest.fixture(autouse=True)
def blockbuster() -> Iterator[BlockBuster]:
"""Detect blocking calls within an asynchronous event loop"""
with blockbuster_ctx() as bb:
yield bb


def openid_configuration() -> dict:
"""OpenID configuration fixture"""
zitadel_host = zitadel_issuer()
return {
"issuer": zitadel_host,
"authorization_endpoint": f"{zitadel_host}/oauth/v2/authorize",
"token_endpoint": f"{zitadel_host}/oauth/v2/token",
"introspection_endpoint": f"{zitadel_host}/oauth/v2/introspect",
"userinfo_endpoint": f"{zitadel_host}/oidc/v1/userinfo",
"revocation_endpoint": f"{zitadel_host}/oauth/v2/revoke",
"end_session_endpoint": f"{zitadel_host}/oidc/v1/end_session",
"device_authorization_endpoint": f"{zitadel_host}/oauth/v2/device_authorization",
"jwks_uri": f"{zitadel_host}/oauth/v2/keys",
"issuer": ZITADEL_ISSUER,
"authorization_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/authorize",
"token_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/token",
"introspection_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/introspect",
"userinfo_endpoint": f"{ZITADEL_ISSUER}/oidc/v1/userinfo",
"revocation_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/revoke",
"end_session_endpoint": f"{ZITADEL_ISSUER}/oidc/v1/end_session",
"device_authorization_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/device_authorization",
"jwks_uri": f"{ZITADEL_ISSUER}/oauth/v2/keys",
"scopes_supported": [
"openid",
"profile",
Expand Down
17 changes: 9 additions & 8 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from demo_project.main import app
from fastapi_zitadel_auth import ZitadelAuth
from fastapi_zitadel_auth.token import TokenValidator
from tests.utils import create_test_token, zitadel_primary_domain, zitadel_issuer
from tests.utils import create_test_token, ZITADEL_PRIMARY_DOMAIN, ZITADEL_ISSUER, ZITADEL_PROJECT_ID, ZITADEL_CLIENT_ID


@pytest.mark.asyncio
Expand All @@ -34,18 +34,18 @@ async def test_admin_user(fastapi_app, mock_openid_and_keys):
"access_token": access_token,
"claims": {
"aud": [
"123456789",
"987654321",
ZITADEL_PROJECT_ID,
ZITADEL_CLIENT_ID,
],
"client_id": "123456789",
"client_id": ZITADEL_CLIENT_ID,
"exp": expires,
"iat": issued_at,
"iss": zitadel_issuer(),
"iss": ZITADEL_ISSUER,
"jti": "unique-token-id",
"nbf": issued_at,
"project_roles": {
"admin": {
"role_id": zitadel_primary_domain(),
"role_id": ZITADEL_PRIMARY_DOMAIN,
},
},
"sub": "user123",
Expand Down Expand Up @@ -191,6 +191,7 @@ async def test_malformed_token(fastapi_app, mock_openid_and_keys):
response = await ac.get("/api/protected/admin")
assert response.status_code == 401
assert response.json() == {"detail": {"error": "invalid_token", "message": "Invalid token format"}}
assert response.headers["WWW-Authenticate"] == "Bearer"


async def test_none_token(fastapi_app, mock_openid_and_keys, mocker):
Expand Down Expand Up @@ -276,9 +277,9 @@ async def test_exception_handled(fastapi_app, mock_openid_and_keys, mocker):
assert response.headers["WWW-Authenticate"] == "Bearer"


async def test_change_of_keys_works(fastapi_app, mock_openid_ok_then_empty, freezer):
async def test_keys_expire(fastapi_app, mock_openid_ok_then_empty, freezer):
"""
Test that the keys are fetched again if the current keys are outdated.
Test that the keys are fetched again when they expire and the token is rejected.
"""
async with AsyncClient(
transport=ASGITransport(app=app),
Expand Down
10 changes: 5 additions & 5 deletions tests/test_custom_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
DefaultZitadelClaims,
DefaultZitadelUser,
)
from tests.utils import zitadel_issuer
from tests.utils import ZITADEL_ISSUER


class CustomClaims(JwtClaims):
Expand Down Expand Up @@ -40,7 +40,7 @@ class InvalidUser(BaseModel):
def default_auth():
"""Fixture for default ZitadelAuth instance"""
return ZitadelAuth(
issuer_url=zitadel_issuer(),
issuer_url=ZITADEL_ISSUER,
project_id="project_id",
app_client_id="client_id",
allowed_scopes={
Expand All @@ -55,7 +55,7 @@ def custom_auth():
return ZitadelAuth(
claims_model=CustomClaims,
user_model=CustomUser,
issuer_url=zitadel_issuer(),
issuer_url=ZITADEL_ISSUER,
project_id="project_id",
app_client_id="client_id",
allowed_scopes={
Expand All @@ -80,7 +80,7 @@ def test_invalid_claims_model(self):
with pytest.raises(ValueError, match="claims_model must be a subclass of JwtClaims"):
ZitadelAuth(
claims_model=InvalidClaims, # type: ignore
issuer_url=zitadel_issuer(),
issuer_url=ZITADEL_ISSUER,
project_id="project_id",
app_client_id="client_id",
allowed_scopes={
Expand All @@ -93,7 +93,7 @@ def test_invalid_user_model(self):
with pytest.raises(ValueError, match="user_model must be a subclass of BaseZitadelUser"):
ZitadelAuth(
user_model=InvalidUser, # type: ignore
issuer_url=zitadel_issuer(),
issuer_url=ZITADEL_ISSUER,
project_id="project_id",
app_client_id="client_id",
allowed_scopes={
Expand Down
6 changes: 3 additions & 3 deletions tests/test_openapi_scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import openapi_spec_validator

from tests.utils import zitadel_issuer
from tests.utils import ZITADEL_ISSUER

openapi_schema = {
"openapi": "3.1.0",
Expand Down Expand Up @@ -66,8 +66,8 @@
"urn:zitadel:iam:org:project:id:zitadel:aud": "Audience",
"urn:zitadel:iam:org:projects:roles": "Projects roles",
},
"authorizationUrl": f"{zitadel_issuer()}/oauth/v2/authorize",
"tokenUrl": f"{zitadel_issuer()}/oauth/v2/token",
"authorizationUrl": f"{ZITADEL_ISSUER}/oauth/v2/authorize",
"tokenUrl": f"{ZITADEL_ISSUER}/oauth/v2/token",
}
},
}
Expand Down
10 changes: 5 additions & 5 deletions tests/test_openid_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey

from fastapi_zitadel_auth.openid_config import OpenIdConfig
from tests.utils import valid_key, zitadel_issuer, openid_config_url
from tests.utils import valid_key, ZITADEL_ISSUER, openid_config_url


@pytest.fixture
def mock_openid_config():
"""Fixture providing mock OpenID configuration data."""
return {
"issuer": zitadel_issuer(),
"authorization_endpoint": f"{zitadel_issuer()}/oauth/v2/authorize",
"token_endpoint": f"{zitadel_issuer()}/oauth/v2/token",
"jwks_uri": f"{zitadel_issuer()}/oauth/v2/keys",
"issuer": ZITADEL_ISSUER,
"authorization_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/authorize",
"token_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/token",
"jwks_uri": f"{ZITADEL_ISSUER}/oauth/v2/keys",
}


Expand Down
46 changes: 23 additions & 23 deletions tests/test_token_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from fastapi_zitadel_auth.exceptions import UnauthorizedException, ForbiddenException
from fastapi_zitadel_auth.token import TokenValidator
from tests.utils import zitadel_issuer
from tests.utils import ZITADEL_ISSUER, ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID


@pytest.fixture(scope="module")
Expand All @@ -37,8 +37,8 @@ def valid_token(rsa_keys) -> str:

claims = {
"sub": "user123",
"iss": zitadel_issuer(),
"aud": ["client123", "project123"],
"iss": ZITADEL_ISSUER,
"aud": [ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID],
"exp": now + 3600,
"iat": now,
"nbf": now,
Expand Down Expand Up @@ -185,13 +185,13 @@ def test_verify_valid_token(self, token_validator, valid_token, rsa_keys):
claims = token_validator.verify(
token=valid_token,
key=public_key,
audiences=["client123", "project123"],
issuer=zitadel_issuer(),
audiences=[ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID],
issuer=ZITADEL_ISSUER,
)

assert claims["sub"] == "user123"
assert claims["iss"] == zitadel_issuer()
assert "client123" in claims["aud"]
assert claims["iss"] == ZITADEL_ISSUER
assert ZITADEL_CLIENT_ID in claims["aud"]

def test_verify_expired_token(self, token_validator, rsa_keys):
"""Test that the TokenValidator raises an exception when verifying an expired token"""
Expand All @@ -200,8 +200,8 @@ def test_verify_expired_token(self, token_validator, rsa_keys):

expired_claims = {
"sub": "user123",
"iss": zitadel_issuer(),
"aud": ["client123"],
"iss": ZITADEL_ISSUER,
"aud": [ZITADEL_CLIENT_ID],
"exp": now - 3600, # Expired 1 hour ago
"iat": now - 7200,
"nbf": now - 7200,
Expand All @@ -220,8 +220,8 @@ def test_verify_expired_token(self, token_validator, rsa_keys):
token_validator.verify(
token=expired_token,
key=public_key,
audiences=["client123"],
issuer=zitadel_issuer(),
audiences=[ZITADEL_CLIENT_ID],
issuer=ZITADEL_ISSUER,
)

def test_verify_invalid_audience(self, token_validator, valid_token, rsa_keys):
Expand All @@ -233,7 +233,7 @@ def test_verify_invalid_audience(self, token_validator, valid_token, rsa_keys):
token=valid_token,
key=public_key,
audiences=["wrong_audience"],
issuer=zitadel_issuer(),
issuer=ZITADEL_ISSUER,
)

def test_verify_invalid_issuer(self, token_validator, valid_token, rsa_keys):
Expand All @@ -244,7 +244,7 @@ def test_verify_invalid_issuer(self, token_validator, valid_token, rsa_keys):
token_validator.verify(
token=valid_token,
key=public_key,
audiences=["client123", "project123"],
audiences=[ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID],
issuer="https://wrong.issuer.com",
)

Expand All @@ -258,8 +258,8 @@ def test_verify_invalid_signature(self, token_validator, valid_token):
token_validator.verify(
token=valid_token,
key=wrong_key,
audiences=["client123", "project123"],
issuer=zitadel_issuer(),
audiences=[ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID],
issuer=ZITADEL_ISSUER,
)

def test_verify_not_yet_valid(self, token_validator, rsa_keys):
Expand All @@ -269,8 +269,8 @@ def test_verify_not_yet_valid(self, token_validator, rsa_keys):

future_claims = {
"sub": "user123",
"iss": zitadel_issuer(),
"aud": ["client123"],
"iss": ZITADEL_ISSUER,
"aud": [ZITADEL_CLIENT_ID],
"exp": now + 7200,
"iat": now,
"nbf": now + 3600, # Not valid for another hour
Expand All @@ -289,8 +289,8 @@ def test_verify_not_yet_valid(self, token_validator, rsa_keys):
token_validator.verify(
token=future_token,
key=public_key,
audiences=["client123"],
issuer=zitadel_issuer(),
audiences=[ZITADEL_CLIENT_ID],
issuer=ZITADEL_ISSUER,
)

def test_verify_missing_claims(self, token_validator, rsa_keys):
Expand All @@ -299,8 +299,8 @@ def test_verify_missing_claims(self, token_validator, rsa_keys):
now = int(time.time())

incomplete_claims = {
"iss": zitadel_issuer(),
"aud": ["client123"],
"iss": ZITADEL_ISSUER,
"aud": [ZITADEL_CLIENT_ID],
"exp": now + 3600,
# Missing 'sub' claim
}
Expand All @@ -317,6 +317,6 @@ def test_verify_missing_claims(self, token_validator, rsa_keys):
token_validator.verify(
token=incomplete_token,
key=public_key,
audiences=["client123"],
issuer=zitadel_issuer(),
audiences=[ZITADEL_CLIENT_ID],
issuer=ZITADEL_ISSUER,
)
Loading

0 comments on commit d24ef4d

Please sign in to comment.