From 5aeecc7c91981c620e56f8e0f2becd1e86efdb70 Mon Sep 17 00:00:00 2001 From: David Huser Date: Tue, 18 Feb 2025 08:32:54 +0100 Subject: [PATCH] chore: unify test env vars handling --- tests/conftest.py | 48 ++++++++++++++++++++--------------- tests/test_auth.py | 17 +++++++------ tests/test_custom_models.py | 10 ++++---- tests/test_openapi_scheme.py | 6 ++--- tests/test_openid_config.py | 10 ++++---- tests/test_token_validator.py | 46 ++++++++++++++++----------------- tests/test_user.py | 22 ++++++++-------- tests/utils.py | 37 +++++++++++++-------------- 8 files changed, 100 insertions(+), 96 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index c23159b..054524a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,22 +12,36 @@ from demo_project.dependencies import zitadel_auth from demo_project.main import app from fastapi_zitadel_auth import ZitadelAuth -from tests.utils import create_openid_keys, zitadel_issuer, openid_config_url, keys_url +from tests.utils import ( + create_openid_keys, + ZITADEL_ISSUER, + openid_config_url, + keys_url, + ZITADEL_CLIENT_ID, + ZITADEL_PROJECT_ID, +) @pytest.fixture def fastapi_app(): """FastAPI app fixture""" zitadel_auth_overrides = ZitadelAuth( - issuer_url=zitadel_issuer(), - app_client_id="123456789", - project_id="987654321", + issuer_url=ZITADEL_ISSUER, + app_client_id=ZITADEL_CLIENT_ID, + project_id=ZITADEL_PROJECT_ID, allowed_scopes={"scope1": "Some scope"}, ) app.dependency_overrides[zitadel_auth] = zitadel_auth_overrides yield +@pytest.fixture(autouse=True) +def blockbuster() -> Iterator[BlockBuster]: + """Detect blocking calls within an asynchronous event loop""" + with blockbuster_ctx() as bb: + yield bb + + @pytest.fixture(autouse=True) async def reset_openid_config(): """Reset the OpenID configuration before each test""" @@ -36,26 +50,18 @@ async def reset_openid_config(): yield -@pytest.fixture(autouse=True) -def blockbuster() -> Iterator[BlockBuster]: - """Detect blocking calls within an asynchronous event loop""" - with blockbuster_ctx() as bb: - yield bb - - def openid_configuration() -> dict: """OpenID configuration fixture""" - zitadel_host = zitadel_issuer() return { - "issuer": zitadel_host, - "authorization_endpoint": f"{zitadel_host}/oauth/v2/authorize", - "token_endpoint": f"{zitadel_host}/oauth/v2/token", - "introspection_endpoint": f"{zitadel_host}/oauth/v2/introspect", - "userinfo_endpoint": f"{zitadel_host}/oidc/v1/userinfo", - "revocation_endpoint": f"{zitadel_host}/oauth/v2/revoke", - "end_session_endpoint": f"{zitadel_host}/oidc/v1/end_session", - "device_authorization_endpoint": f"{zitadel_host}/oauth/v2/device_authorization", - "jwks_uri": f"{zitadel_host}/oauth/v2/keys", + "issuer": ZITADEL_ISSUER, + "authorization_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/authorize", + "token_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/token", + "introspection_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/introspect", + "userinfo_endpoint": f"{ZITADEL_ISSUER}/oidc/v1/userinfo", + "revocation_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/revoke", + "end_session_endpoint": f"{ZITADEL_ISSUER}/oidc/v1/end_session", + "device_authorization_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/device_authorization", + "jwks_uri": f"{ZITADEL_ISSUER}/oauth/v2/keys", "scopes_supported": [ "openid", "profile", diff --git a/tests/test_auth.py b/tests/test_auth.py index 22d13a0..735a141 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -11,7 +11,7 @@ from demo_project.main import app from fastapi_zitadel_auth import ZitadelAuth from fastapi_zitadel_auth.token import TokenValidator -from tests.utils import create_test_token, zitadel_primary_domain, zitadel_issuer +from tests.utils import create_test_token, ZITADEL_PRIMARY_DOMAIN, ZITADEL_ISSUER, ZITADEL_PROJECT_ID, ZITADEL_CLIENT_ID @pytest.mark.asyncio @@ -34,18 +34,18 @@ async def test_admin_user(fastapi_app, mock_openid_and_keys): "access_token": access_token, "claims": { "aud": [ - "123456789", - "987654321", + ZITADEL_PROJECT_ID, + ZITADEL_CLIENT_ID, ], - "client_id": "123456789", + "client_id": ZITADEL_CLIENT_ID, "exp": expires, "iat": issued_at, - "iss": zitadel_issuer(), + "iss": ZITADEL_ISSUER, "jti": "unique-token-id", "nbf": issued_at, "project_roles": { "admin": { - "role_id": zitadel_primary_domain(), + "role_id": ZITADEL_PRIMARY_DOMAIN, }, }, "sub": "user123", @@ -191,6 +191,7 @@ async def test_malformed_token(fastapi_app, mock_openid_and_keys): response = await ac.get("/api/protected/admin") assert response.status_code == 401 assert response.json() == {"detail": {"error": "invalid_token", "message": "Invalid token format"}} + assert response.headers["WWW-Authenticate"] == "Bearer" async def test_none_token(fastapi_app, mock_openid_and_keys, mocker): @@ -276,9 +277,9 @@ async def test_exception_handled(fastapi_app, mock_openid_and_keys, mocker): assert response.headers["WWW-Authenticate"] == "Bearer" -async def test_change_of_keys_works(fastapi_app, mock_openid_ok_then_empty, freezer): +async def test_keys_expire(fastapi_app, mock_openid_ok_then_empty, freezer): """ - Test that the keys are fetched again if the current keys are outdated. + Test that the keys are fetched again when they expire and the token is rejected. """ async with AsyncClient( transport=ASGITransport(app=app), diff --git a/tests/test_custom_models.py b/tests/test_custom_models.py index ed623c2..644214c 100644 --- a/tests/test_custom_models.py +++ b/tests/test_custom_models.py @@ -8,7 +8,7 @@ DefaultZitadelClaims, DefaultZitadelUser, ) -from tests.utils import zitadel_issuer +from tests.utils import ZITADEL_ISSUER class CustomClaims(JwtClaims): @@ -40,7 +40,7 @@ class InvalidUser(BaseModel): def default_auth(): """Fixture for default ZitadelAuth instance""" return ZitadelAuth( - issuer_url=zitadel_issuer(), + issuer_url=ZITADEL_ISSUER, project_id="project_id", app_client_id="client_id", allowed_scopes={ @@ -55,7 +55,7 @@ def custom_auth(): return ZitadelAuth( claims_model=CustomClaims, user_model=CustomUser, - issuer_url=zitadel_issuer(), + issuer_url=ZITADEL_ISSUER, project_id="project_id", app_client_id="client_id", allowed_scopes={ @@ -80,7 +80,7 @@ def test_invalid_claims_model(self): with pytest.raises(ValueError, match="claims_model must be a subclass of JwtClaims"): ZitadelAuth( claims_model=InvalidClaims, # type: ignore - issuer_url=zitadel_issuer(), + issuer_url=ZITADEL_ISSUER, project_id="project_id", app_client_id="client_id", allowed_scopes={ @@ -93,7 +93,7 @@ def test_invalid_user_model(self): with pytest.raises(ValueError, match="user_model must be a subclass of BaseZitadelUser"): ZitadelAuth( user_model=InvalidUser, # type: ignore - issuer_url=zitadel_issuer(), + issuer_url=ZITADEL_ISSUER, project_id="project_id", app_client_id="client_id", allowed_scopes={ diff --git a/tests/test_openapi_scheme.py b/tests/test_openapi_scheme.py index b63fc41..4fecefc 100644 --- a/tests/test_openapi_scheme.py +++ b/tests/test_openapi_scheme.py @@ -4,7 +4,7 @@ import openapi_spec_validator -from tests.utils import zitadel_issuer +from tests.utils import ZITADEL_ISSUER openapi_schema = { "openapi": "3.1.0", @@ -66,8 +66,8 @@ "urn:zitadel:iam:org:project:id:zitadel:aud": "Audience", "urn:zitadel:iam:org:projects:roles": "Projects roles", }, - "authorizationUrl": f"{zitadel_issuer()}/oauth/v2/authorize", - "tokenUrl": f"{zitadel_issuer()}/oauth/v2/token", + "authorizationUrl": f"{ZITADEL_ISSUER}/oauth/v2/authorize", + "tokenUrl": f"{ZITADEL_ISSUER}/oauth/v2/token", } }, } diff --git a/tests/test_openid_config.py b/tests/test_openid_config.py index a3d4676..c99d94d 100644 --- a/tests/test_openid_config.py +++ b/tests/test_openid_config.py @@ -7,17 +7,17 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey from fastapi_zitadel_auth.openid_config import OpenIdConfig -from tests.utils import valid_key, zitadel_issuer, openid_config_url +from tests.utils import valid_key, ZITADEL_ISSUER, openid_config_url @pytest.fixture def mock_openid_config(): """Fixture providing mock OpenID configuration data.""" return { - "issuer": zitadel_issuer(), - "authorization_endpoint": f"{zitadel_issuer()}/oauth/v2/authorize", - "token_endpoint": f"{zitadel_issuer()}/oauth/v2/token", - "jwks_uri": f"{zitadel_issuer()}/oauth/v2/keys", + "issuer": ZITADEL_ISSUER, + "authorization_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/authorize", + "token_endpoint": f"{ZITADEL_ISSUER}/oauth/v2/token", + "jwks_uri": f"{ZITADEL_ISSUER}/oauth/v2/keys", } diff --git a/tests/test_token_validator.py b/tests/test_token_validator.py index f8beac6..0592afd 100644 --- a/tests/test_token_validator.py +++ b/tests/test_token_validator.py @@ -12,7 +12,7 @@ from fastapi_zitadel_auth.exceptions import UnauthorizedException, ForbiddenException from fastapi_zitadel_auth.token import TokenValidator -from tests.utils import zitadel_issuer +from tests.utils import ZITADEL_ISSUER, ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID @pytest.fixture(scope="module") @@ -37,8 +37,8 @@ def valid_token(rsa_keys) -> str: claims = { "sub": "user123", - "iss": zitadel_issuer(), - "aud": ["client123", "project123"], + "iss": ZITADEL_ISSUER, + "aud": [ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID], "exp": now + 3600, "iat": now, "nbf": now, @@ -185,13 +185,13 @@ def test_verify_valid_token(self, token_validator, valid_token, rsa_keys): claims = token_validator.verify( token=valid_token, key=public_key, - audiences=["client123", "project123"], - issuer=zitadel_issuer(), + audiences=[ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID], + issuer=ZITADEL_ISSUER, ) assert claims["sub"] == "user123" - assert claims["iss"] == zitadel_issuer() - assert "client123" in claims["aud"] + assert claims["iss"] == ZITADEL_ISSUER + assert ZITADEL_CLIENT_ID in claims["aud"] def test_verify_expired_token(self, token_validator, rsa_keys): """Test that the TokenValidator raises an exception when verifying an expired token""" @@ -200,8 +200,8 @@ def test_verify_expired_token(self, token_validator, rsa_keys): expired_claims = { "sub": "user123", - "iss": zitadel_issuer(), - "aud": ["client123"], + "iss": ZITADEL_ISSUER, + "aud": [ZITADEL_CLIENT_ID], "exp": now - 3600, # Expired 1 hour ago "iat": now - 7200, "nbf": now - 7200, @@ -220,8 +220,8 @@ def test_verify_expired_token(self, token_validator, rsa_keys): token_validator.verify( token=expired_token, key=public_key, - audiences=["client123"], - issuer=zitadel_issuer(), + audiences=[ZITADEL_CLIENT_ID], + issuer=ZITADEL_ISSUER, ) def test_verify_invalid_audience(self, token_validator, valid_token, rsa_keys): @@ -233,7 +233,7 @@ def test_verify_invalid_audience(self, token_validator, valid_token, rsa_keys): token=valid_token, key=public_key, audiences=["wrong_audience"], - issuer=zitadel_issuer(), + issuer=ZITADEL_ISSUER, ) def test_verify_invalid_issuer(self, token_validator, valid_token, rsa_keys): @@ -244,7 +244,7 @@ def test_verify_invalid_issuer(self, token_validator, valid_token, rsa_keys): token_validator.verify( token=valid_token, key=public_key, - audiences=["client123", "project123"], + audiences=[ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID], issuer="https://wrong.issuer.com", ) @@ -258,8 +258,8 @@ def test_verify_invalid_signature(self, token_validator, valid_token): token_validator.verify( token=valid_token, key=wrong_key, - audiences=["client123", "project123"], - issuer=zitadel_issuer(), + audiences=[ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID], + issuer=ZITADEL_ISSUER, ) def test_verify_not_yet_valid(self, token_validator, rsa_keys): @@ -269,8 +269,8 @@ def test_verify_not_yet_valid(self, token_validator, rsa_keys): future_claims = { "sub": "user123", - "iss": zitadel_issuer(), - "aud": ["client123"], + "iss": ZITADEL_ISSUER, + "aud": [ZITADEL_CLIENT_ID], "exp": now + 7200, "iat": now, "nbf": now + 3600, # Not valid for another hour @@ -289,8 +289,8 @@ def test_verify_not_yet_valid(self, token_validator, rsa_keys): token_validator.verify( token=future_token, key=public_key, - audiences=["client123"], - issuer=zitadel_issuer(), + audiences=[ZITADEL_CLIENT_ID], + issuer=ZITADEL_ISSUER, ) def test_verify_missing_claims(self, token_validator, rsa_keys): @@ -299,8 +299,8 @@ def test_verify_missing_claims(self, token_validator, rsa_keys): now = int(time.time()) incomplete_claims = { - "iss": zitadel_issuer(), - "aud": ["client123"], + "iss": ZITADEL_ISSUER, + "aud": [ZITADEL_CLIENT_ID], "exp": now + 3600, # Missing 'sub' claim } @@ -317,6 +317,6 @@ def test_verify_missing_claims(self, token_validator, rsa_keys): token_validator.verify( token=incomplete_token, key=public_key, - audiences=["client123"], - issuer=zitadel_issuer(), + audiences=[ZITADEL_CLIENT_ID], + issuer=ZITADEL_ISSUER, ) diff --git a/tests/test_user.py b/tests/test_user.py index 65884fd..793b021 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -12,10 +12,8 @@ DefaultZitadelClaims, DefaultZitadelUser, ) -from tests.utils import zitadel_issuer, zitadel_primary_domain +from tests.utils import ZITADEL_ISSUER, ZITADEL_PRIMARY_DOMAIN, ZITADEL_CLIENT_ID, ZITADEL_PROJECT_ID -client_id = "client1" -project_id = "123456789" role_key = "role1" role_id = "295621089671959405" sub = "22222222222222222222" @@ -26,11 +24,11 @@ def valid_claims_data() -> dict: """Fixture providing valid JWT claims data.""" now = int(time.time()) return { - "aud": [project_id], - "client_id": client_id, + "aud": [ZITADEL_PROJECT_ID], + "client_id": ZITADEL_CLIENT_ID, "exp": now + 3600, "iat": now, - "iss": zitadel_issuer(), + "iss": ZITADEL_ISSUER, "sub": sub, "nbf": now, "jti": "unique-token-id", @@ -41,14 +39,14 @@ def valid_claims_data() -> dict: def valid_claims_with_project_roles(valid_claims_data): """Fixture providing claims data with Zitadel project roles.""" data = valid_claims_data.copy() - data[f"urn:zitadel:iam:org:project:{project_id}:roles"] = {role_key: {role_id: zitadel_primary_domain()}} + data[f"urn:zitadel:iam:org:project:{ZITADEL_PROJECT_ID}:roles"] = {role_key: {role_id: ZITADEL_PRIMARY_DOMAIN}} return data class TestBaseZitadelClaims: """Test suite for JwtClaims model.""" - @pytest.mark.parametrize("aud", [[project_id], ["audience1", "audience2"]]) + @pytest.mark.parametrize("aud", [[ZITADEL_PROJECT_ID], ["audience1", "audience2"]]) def test_valid_audience_formats(self, valid_claims_data, aud): """Test that list audience formats are accepted.""" data = valid_claims_data.copy() @@ -86,7 +84,7 @@ class TestDefaultZitadelClaims: def test_project_roles_extraction(self, valid_claims_with_project_roles): """Test extraction of project roles from Zitadel-specific claim.""" claims = DefaultZitadelClaims(**valid_claims_with_project_roles) - assert claims.project_roles == {role_key: {role_id: zitadel_primary_domain()}} + assert claims.project_roles == {role_key: {role_id: ZITADEL_PRIMARY_DOMAIN}} def test_missing_project_roles(self, valid_claims_data): """Test handling of missing project roles.""" @@ -96,10 +94,12 @@ def test_missing_project_roles(self, valid_claims_data): def test_different_project_roles(self, valid_claims_data): """Test extraction of project roles with different role values.""" data = valid_claims_data.copy() - data[f"urn:zitadel:iam:org:project:{project_id}:roles"] = {"role2": {"123456789": zitadel_primary_domain()}} + data[f"urn:zitadel:iam:org:project:{ZITADEL_PROJECT_ID}:roles"] = { + "role2": {"123456789": ZITADEL_PRIMARY_DOMAIN} + } claims = DefaultZitadelClaims(**data) - assert claims.project_roles == {"role2": {"123456789": zitadel_primary_domain()}} + assert claims.project_roles == {"role2": {"123456789": ZITADEL_PRIMARY_DOMAIN}} class TestDefaultZitadelUser: diff --git a/tests/utils.py b/tests/utils.py index a9500fd..5e82721 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,7 @@ Test utilities """ +import os from datetime import datetime, timedelta import jwt @@ -9,15 +10,21 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa +# Global test settings +ZITADEL_ISSUER = os.environ["ZITADEL_HOST"] # The Zitadel issuer URL +ZITADEL_PROJECT_ID = os.environ["ZITADEL_PROJECT_ID"] # The project ID where the API app is located +ZITADEL_CLIENT_ID = os.environ["OAUTH_CLIENT_ID"] # The client ID of the API app +ZITADEL_PRIMARY_DOMAIN = "client-fza.region.zitadel.cloud" # The primary domain of a Zitadel client -def zitadel_issuer() -> str: - """Zitadel issuer_url URL used for tests""" - return "https://test-fza01.zitadel.cloud" +def openid_config_url() -> str: + """OpenID configuration URL fixture""" + return f"{ZITADEL_ISSUER}/.well-known/openid-configuration" -def zitadel_primary_domain() -> str: - """Zitadel primary domain used for tests""" - return "client-fza.region.zitadel.cloud" + +def keys_url() -> str: + """OpenID keys URL fixture""" + return f"{ZITADEL_ISSUER}/oauth/v2/keys" def create_test_token( @@ -34,11 +41,11 @@ def create_test_token( """Create JWT tokens for testing""" now = datetime.now() claims = { - "aud": ["wrong-id"] if invalid_aud else ["123456789", "987654321"], - "client_id": "123456789", + "aud": ["wrong-id"] if invalid_aud else [ZITADEL_PROJECT_ID, ZITADEL_CLIENT_ID], + "client_id": ZITADEL_CLIENT_ID, "exp": int((now - timedelta(hours=1)).timestamp()) if expired else int((now + timedelta(hours=1)).timestamp()), "iat": int(now.timestamp()), - "iss": "wrong-issuer" if invalid_iss else zitadel_issuer(), + "iss": "wrong-issuer" if invalid_iss else ZITADEL_ISSUER, "sub": "user123", "nbf": int(now.timestamp()), "jti": "unique-token-id", @@ -46,7 +53,7 @@ def create_test_token( } if role: - claims["urn:zitadel:iam:org:project:987654321:roles"] = {role: {"role_id": zitadel_primary_domain()}} + claims[f"urn:zitadel:iam:org:project:{ZITADEL_PROJECT_ID}:roles"] = {role: {"role_id": ZITADEL_PRIMARY_DOMAIN}} # For evil token use the evil key but claim it's from the valid key signing_key = evil_key if evil else valid_key @@ -100,13 +107,3 @@ def create_openid_keys(empty_keys: bool = False, no_valid_keys: bool = False) -> valid_key = rsa.generate_private_key(backend=default_backend(), public_exponent=65537, key_size=2048) evil_key = rsa.generate_private_key(backend=default_backend(), public_exponent=65537, key_size=2048) - - -def openid_config_url() -> str: - """OpenID configuration URL fixture""" - return f"{zitadel_issuer()}/.well-known/openid-configuration" - - -def keys_url() -> str: - """OpenID keys URL fixture""" - return f"{zitadel_issuer()}/oauth/v2/keys"