Skip to content

Commit

Permalink
Add UM get_authorization_url method
Browse files Browse the repository at this point in the history
  • Loading branch information
blairworkos committed Dec 5, 2023
1 parent d2e7104 commit 91a24b3
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 1 deletion.
2 changes: 1 addition & 1 deletion tests/test_sso.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def test_authorization_url_throws_value_error_with_incorrect_provider_type(
state=self.state,
)

def test_authorization_url_throws_value_error_wihout_redirect_uri(
def test_authorization_url_throws_value_error_without_redirect_uri(
self, setup_with_client_id
):
with pytest.raises(
Expand Down
124 changes: 124 additions & 0 deletions tests/test_user_management.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import json
from six.moves.urllib.parse import parse_qsl, urlparse
import pytest
import workos

from tests.utils.fixtures.mock_auth_factor_totp import MockAuthFactorTotp
from tests.utils.fixtures.mock_invitation import MockInvitation
from tests.utils.fixtures.mock_organization_membership import MockOrganizationMembership
from tests.utils.fixtures.mock_session import MockSession
from tests.utils.fixtures.mock_user import MockUser
from workos.user_management import UserManagement
from workos.utils.request import RESPONSE_TYPE_CODE


class TestUserManagement(object):
Expand Down Expand Up @@ -347,6 +351,126 @@ def test_delete_organization_membership(self, capture_and_mock_request):
assert url[0].endswith("user_management/organization_memberships/om_ABCDE")
assert user is None

def test_authorization_url_throws_value_error_with_missing_connection_organization_and_provider(
self,
):
redirect_uri = "https://localhost/auth/callback"
with pytest.raises(ValueError, match=r"Incomplete arguments.*"):
self.user_management.get_authorization_url(redirect_uri=redirect_uri)

def test_authorization_url_has_expected_query_params_with_connection_id(self):
connection_id = "connection_123"
redirect_uri = "https://localhost/auth/callback"
authorization_url = self.user_management.get_authorization_url(
connection_id=connection_id,
redirect_uri=redirect_uri,
)

parsed_url = urlparse(authorization_url)

assert dict(parse_qsl(parsed_url.query)) == {
"connection_id": connection_id,
"client_id": workos.client_id,
"redirect_uri": redirect_uri,
"response_type": RESPONSE_TYPE_CODE,
}

def test_authorization_url_has_expected_query_params_with_organization_id(self):
organization_id = "organization_123"
redirect_uri = "https://localhost/auth/callback"
authorization_url = self.user_management.get_authorization_url(
organization_id=organization_id,
redirect_uri=redirect_uri,
)

parsed_url = urlparse(authorization_url)

assert dict(parse_qsl(parsed_url.query)) == {
"organization_id": organization_id,
"client_id": workos.client_id,
"redirect_uri": redirect_uri,
"response_type": RESPONSE_TYPE_CODE,
}

def test_authorization_url_has_expected_query_params_with_provider(self):
provider = "GoogleOAuth"
redirect_uri = "https://localhost/auth/callback"
authorization_url = self.user_management.get_authorization_url(
provider=provider, redirect_uri=redirect_uri
)

parsed_url = urlparse(authorization_url)

assert dict(parse_qsl(parsed_url.query)) == {
"provider": provider,
"client_id": workos.client_id,
"redirect_uri": redirect_uri,
"response_type": RESPONSE_TYPE_CODE,
}

def test_authorization_url_has_expected_query_params_with_domain_hint(self):
connection_id = "connection_123"
redirect_uri = "https://localhost/auth/callback"
domain_hint = "workos.com"

authorization_url = self.user_management.get_authorization_url(
connection_id=connection_id,
domain_hint=domain_hint,
redirect_uri=redirect_uri,
)

parsed_url = urlparse(authorization_url)

assert dict(parse_qsl(parsed_url.query)) == {
"domain_hint": domain_hint,
"client_id": workos.client_id,
"redirect_uri": redirect_uri,
"connection_id": connection_id,
"response_type": RESPONSE_TYPE_CODE,
}

def test_authorization_url_has_expected_query_params_with_login_hint(self):
connection_id = "connection_123"
redirect_uri = "https://localhost/auth/callback"
login_hint = "foo@workos.com"

authorization_url = self.user_management.get_authorization_url(
connection_id=connection_id,
login_hint=login_hint,
redirect_uri=redirect_uri,
)

parsed_url = urlparse(authorization_url)

assert dict(parse_qsl(parsed_url.query)) == {
"login_hint": login_hint,
"client_id": workos.client_id,
"redirect_uri": redirect_uri,
"connection_id": connection_id,
"response_type": RESPONSE_TYPE_CODE,
}

def test_authorization_url_has_expected_query_params_with_state(self):
connection_id = "connection_123"
redirect_uri = "https://localhost/auth/callback"
state = json.dumps({"things": "with_stuff"})

authorization_url = self.user_management.get_authorization_url(
connection_id=connection_id,
state=state,
redirect_uri=redirect_uri,
)

parsed_url = urlparse(authorization_url)

assert dict(parse_qsl(parsed_url.query)) == {
"state": state,
"client_id": workos.client_id,
"redirect_uri": redirect_uri,
"connection_id": connection_id,
"response_type": RESPONSE_TYPE_CODE,
}

def test_authenticate_with_password(
self, capture_and_mock_request, mock_auth_response
):
Expand Down
69 changes: 69 additions & 0 deletions workos/user_management.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from requests import Request
import workos
from workos.resources.list import WorkOSListResource
from workos.resources.mfa import WorkOSAuthenticationFactorTotp, WorkOSChallenge
Expand All @@ -11,6 +12,7 @@
from workos.utils.pagination_order import Order
from workos.utils.request import (
RequestHelper,
RESPONSE_TYPE_CODE,
REQUEST_METHOD_POST,
REQUEST_METHOD_GET,
REQUEST_METHOD_DELETE,
Expand All @@ -22,6 +24,7 @@
USER_DETAIL_PATH = "user_management/users/{0}"
ORGANIZATION_MEMBERSHIP_PATH = "user_management/organization_memberships"
ORGANIZATION_MEMBERSHIP_DETAIL_PATH = "user_management/organization_memberships/{0}"
USER_AUTHORIZATION_PATH = "user_management/authorize"
USER_AUTHENTICATE_PATH = "user_management/authenticate"
USER_SEND_PASSWORD_RESET_PATH = "user_management/password_reset/send"
USER_RESET_PASSWORD_PATH = "user_management/password_reset/confirm"
Expand Down Expand Up @@ -316,6 +319,72 @@ def delete_organization_membership(self, organization_membership_id):
token=workos.api_key,
)

def get_authorization_url(
self,
redirect_uri,
connection_id=None,
organization_id=None,
provider=None,
domain_hint=None,
login_hint=None,
state=None,
):
"""Generate an OAuth 2.0 authorization URL.
The URL generated will redirect a User to the Identity Provider configured through
WorkOS.
Kwargs:
redirect_uri (str) - A Redirect URI to return an authorized user to.
connection_id (str) - The connection_id connection selector is used to initiate SSO for a Connection.
The value of this parameter should be a WorkOS Connection ID. (Optional)
organization_id (str) - The organization_id connection selector is used to initiate SSO for an Organization.
The value of this parameter should be a WorkOS Organization ID. (Optional)
provider (str) - The provider connection selector is used to initiate SSO using an OAuth-compatible provider.
Currently, the supported values for provider are 'authkit', 'GoogleOAuth' and 'MicrosoftOAuth'. (Optional)
domain_hint (str) - Can be used to pre-fill the domain field when initiating authentication with Microsoft OAuth,
or with a GoogleSAML connection type. (Optional)
login_hint (str) - Can be used to pre-fill the username/email address field of the IdP sign-in page for the user,
if you know their username ahead of time. Currently, this parameter is supported for OAuth, OpenID Connect,
OktaSAML, and AzureSAML connection types. (Optional)
state (str) - An encoded string passed to WorkOS that'd be preserved through the authentication workflow, passed
back as a query parameter. (Optional)
Returns:
str: URL to redirect a User to to begin the OAuth workflow with WorkOS
"""
params = {
"client_id": workos.client_id,
"redirect_uri": redirect_uri,
"response_type": RESPONSE_TYPE_CODE,
}

if connection_id is None and organization_id is None and provider is None:
raise ValueError(
"Incomplete arguments. Need to specify either a 'connection_id', 'organization_id', or 'provider_id'"
)

if connection_id is not None:
params["connection_id"] = connection_id
if organization_id is not None:
params["organization_id"] = organization_id
if provider is not None:
params["provider"] = provider
if domain_hint is not None:
params["domain_hint"] = domain_hint
if login_hint is not None:
params["login_hint"] = login_hint
if state is not None:
params["state"] = state

prepared_request = Request(
"GET",
self.request_helper.generate_api_url(USER_AUTHORIZATION_PATH),
params=params,
).prepare()

return prepared_request.url

def authenticate_with_password(
self,
email,
Expand Down

0 comments on commit 91a24b3

Please sign in to comment.