Skip to content

Commit

Permalink
Update httpx error handler (#522)
Browse files Browse the repository at this point in the history
  • Loading branch information
rikroe authored Mar 28, 2023
1 parent 0a81d51 commit 1a0e421
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 144 deletions.
236 changes: 118 additions & 118 deletions bimmer_connected/api/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
create_s256_code_challenge,
generate_token,
get_correlation_id,
handle_http_status_error,
handle_httpstatuserror,
)
from bimmer_connected.const import (
AUTH_CHINA_LOGIN_URL,
Expand All @@ -31,14 +31,15 @@
USER_AGENT,
X_USER_AGENT,
)
from bimmer_connected.models import MyBMWAPIError

EXPIRES_AT_OFFSET = datetime.timedelta(seconds=HTTPX_TIMEOUT * 2)

_LOGGER = logging.getLogger(__name__)


class MyBMWAuthentication(httpx.Auth):
"""Authentication for MyBMW API."""
"""Authentication and Retry Handler for MyBMW API."""

def __init__(
self,
Expand Down Expand Up @@ -97,7 +98,10 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.
await asyncio.sleep(wait_time)
response = yield request
# Raise if still error after 3rd retry
response.raise_for_status()
try:
response.raise_for_status()
except httpx.HTTPStatusError as ex:
await handle_httpstatuserror(ex, log_handler=_LOGGER)

async def login(self) -> None:
"""Get a valid OAuth token."""
Expand All @@ -124,80 +128,76 @@ async def login(self) -> None:

async def _login_row_na(self):
"""Login to Rest of World and North America."""
try:
async with MyBMWLoginClient(region=self.region) as client:
_LOGGER.debug("Authenticating with MyBMW flow for North America & Rest of World.")

# Get OAuth2 settings from BMW API
r_oauth_settings = await client.get(
OAUTH_CONFIG_URL,
headers={
"ocp-apim-subscription-key": get_ocp_apim_key(self.region),
"bmw-session-id": self.session_id,
**get_correlation_id(),
},
)
oauth_settings = r_oauth_settings.json()

# Generate OAuth2 Code Challenge + State
code_verifier = generate_token(86)
code_challenge = create_s256_code_challenge(code_verifier)

state = generate_token(22)

# Set up authenticate endpoint
authenticate_url = oauth_settings["tokenEndpoint"].replace("/token", "/authenticate")
oauth_base_values = {
"client_id": oauth_settings["clientId"],
"response_type": "code",
"redirect_uri": oauth_settings["returnUrl"],
"state": state,
"nonce": "login_nonce",
"scope": " ".join(oauth_settings["scopes"]),
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}

# Call authenticate endpoint first time (with user/pw) and get authentication
response = await client.post(
authenticate_url,
data=dict(
oauth_base_values,
**{
"grant_type": "authorization_code",
"username": self.username,
"password": self.password,
},
),
)
authorization = httpx.URL(response.json()["redirect_to"]).params["authorization"]

# With authorization, call authenticate endpoint second time to get code
response = await client.post(
authenticate_url,
data=dict(oauth_base_values, **{"authorization": authorization}),
)
code = response.next_request.url.params["code"]

# With code, get token
current_utc_time = datetime.datetime.utcnow()
response = await client.post(
oauth_settings["tokenEndpoint"],
data={
"code": code,
"code_verifier": code_verifier,
"redirect_uri": oauth_settings["returnUrl"],
async with MyBMWLoginClient(region=self.region) as client:
_LOGGER.debug("Authenticating with MyBMW flow for North America & Rest of World.")

# Get OAuth2 settings from BMW API
r_oauth_settings = await client.get(
OAUTH_CONFIG_URL,
headers={
"ocp-apim-subscription-key": get_ocp_apim_key(self.region),
"bmw-session-id": self.session_id,
**get_correlation_id(),
},
)
oauth_settings = r_oauth_settings.json()

# Generate OAuth2 Code Challenge + State
code_verifier = generate_token(86)
code_challenge = create_s256_code_challenge(code_verifier)

state = generate_token(22)

# Set up authenticate endpoint
authenticate_url = oauth_settings["tokenEndpoint"].replace("/token", "/authenticate")
oauth_base_values = {
"client_id": oauth_settings["clientId"],
"response_type": "code",
"redirect_uri": oauth_settings["returnUrl"],
"state": state,
"nonce": "login_nonce",
"scope": " ".join(oauth_settings["scopes"]),
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}

# Call authenticate endpoint first time (with user/pw) and get authentication
response = await client.post(
authenticate_url,
data=dict(
oauth_base_values,
**{
"grant_type": "authorization_code",
"username": self.username,
"password": self.password,
},
auth=(oauth_settings["clientId"], oauth_settings["clientSecret"]),
)
response_json = response.json()

expiration_time = int(response_json["expires_in"])
expires_at = current_utc_time + datetime.timedelta(seconds=expiration_time)
),
)
authorization = httpx.URL(response.json()["redirect_to"]).params["authorization"]

# With authorization, call authenticate endpoint second time to get code
response = await client.post(
authenticate_url,
data=dict(oauth_base_values, **{"authorization": authorization}),
)
code = response.next_request.url.params["code"]

# With code, get token
current_utc_time = datetime.datetime.utcnow()
response = await client.post(
oauth_settings["tokenEndpoint"],
data={
"code": code,
"code_verifier": code_verifier,
"redirect_uri": oauth_settings["returnUrl"],
"grant_type": "authorization_code",
},
auth=(oauth_settings["clientId"], oauth_settings["clientSecret"]),
)
response_json = response.json()

except httpx.HTTPStatusError as ex:
handle_http_status_error(ex, "Authentication", _LOGGER)
expiration_time = int(response_json["expires_in"])
expires_at = current_utc_time + datetime.timedelta(seconds=expiration_time)

return {
"access_token": response_json["access_token"],
Expand Down Expand Up @@ -239,9 +239,8 @@ async def _refresh_token_row_na(self):
expiration_time = int(response_json["expires_in"])
expires_at = current_utc_time + datetime.timedelta(seconds=expiration_time)

except httpx.HTTPStatusError as ex:
_LOGGER.debug("Unable to get access token using refresh token.")
handle_http_status_error(ex, "Authentication", _LOGGER, debug=True)
except MyBMWAPIError:
_LOGGER.debug("Unable to get access token using refresh token, falling back to username/password.")
return {}

return {
Expand All @@ -251,38 +250,34 @@ async def _refresh_token_row_na(self):
}

async def _login_china(self):
try:
async with MyBMWLoginClient(region=self.region) as client:
_LOGGER.debug("Authenticating with MyBMW flow for China.")

# Get current RSA public certificate & use it to encrypt password
response = await client.get(
AUTH_CHINA_PUBLIC_KEY_URL,
)
pem_public_key = response.json()["data"]["value"]

public_key = RSA.import_key(pem_public_key)
cipher_rsa = PKCS1_v1_5.new(public_key)
encrypted = cipher_rsa.encrypt(self.password.encode())
pw_encrypted = base64.b64encode(encrypted).decode("UTF-8")

cipher_aes = AES.new(**get_aes_keys(self.region), mode=AES.MODE_CBC)
nonce = f"{self.username}|{datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%fZ')}".encode()

# Get token
response = await client.post(
AUTH_CHINA_LOGIN_URL,
headers={"x-login-nonce": base64.b64encode(cipher_aes.encrypt(pad(nonce, 16))).decode()},
json={"mobile": self.username, "password": pw_encrypted},
)
response_json = response.json()["data"]

decoded_token = jwt.decode(
response_json["access_token"], algorithms=["HS256"], options={"verify_signature": False}
)

except httpx.HTTPStatusError as ex:
handle_http_status_error(ex, "Authentication", _LOGGER)
async with MyBMWLoginClient(region=self.region) as client:
_LOGGER.debug("Authenticating with MyBMW flow for China.")

# Get current RSA public certificate & use it to encrypt password
response = await client.get(
AUTH_CHINA_PUBLIC_KEY_URL,
)
pem_public_key = response.json()["data"]["value"]

public_key = RSA.import_key(pem_public_key)
cipher_rsa = PKCS1_v1_5.new(public_key)
encrypted = cipher_rsa.encrypt(self.password.encode())
pw_encrypted = base64.b64encode(encrypted).decode("UTF-8")

cipher_aes = AES.new(**get_aes_keys(self.region), mode=AES.MODE_CBC)
nonce = f"{self.username}|{datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%fZ')}".encode()

# Get token
response = await client.post(
AUTH_CHINA_LOGIN_URL,
headers={"x-login-nonce": base64.b64encode(cipher_aes.encrypt(pad(nonce, 16))).decode()},
json={"mobile": self.username, "password": pw_encrypted},
)
response_json = response.json()["data"]

decoded_token = jwt.decode(
response_json["access_token"], algorithms=["HS256"], options={"verify_signature": False}
)

return {
"access_token": response_json["access_token"],
Expand Down Expand Up @@ -310,9 +305,8 @@ async def _refresh_token_china(self):
expiration_time = int(response_json["expires_in"])
expires_at = current_utc_time + datetime.timedelta(seconds=expiration_time)

except httpx.HTTPStatusError as ex:
_LOGGER.debug("Unable to get access token using refresh token.")
handle_http_status_error(ex, "Authentication", _LOGGER, debug=True)
except MyBMWAPIError:
_LOGGER.debug("Unable to get access token using refresh token, falling back to username/password.")
return {}

return {
Expand Down Expand Up @@ -346,11 +340,13 @@ def __init__(self, *args, **kwargs):
async def raise_for_status_event_handler(response: httpx.Response):
"""Event handler that automatically raises HTTPStatusErrors when attached.
Will only raise on 4xx/5xx errors (but not on 429) and not raise on 3xx.
Will only raise on 4xx/5xx errors but not 429 which is handled `self.auth`.
"""
if response.is_error and not response.status_code == 429:
await response.aread()
response.raise_for_status()
if response.is_error and response.status_code != 429:
try:
response.raise_for_status()
except httpx.HTTPStatusError as ex:
await handle_httpstatuserror(ex, log_handler=_LOGGER)

kwargs["event_hooks"]["response"].append(raise_for_status_event_handler)

Expand All @@ -376,6 +372,10 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.
_LOGGER.debug("Sleeping %s seconds due to 429 Too Many Requests", wait_time)
await asyncio.sleep(wait_time)
response = yield request
# Only checking for 429 errors, as all other errors are handled by the
# response hook of MyBMWLoginClient
if response.status_code == 429:
await response.aread()
response.raise_for_status()
try:
response.raise_for_status()
except httpx.HTTPStatusError as ex:
await handle_httpstatuserror(ex, log_handler=_LOGGER)
13 changes: 9 additions & 4 deletions bimmer_connected/api/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Generic API management."""

import logging
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Deque, Dict, Optional
Expand All @@ -8,10 +9,12 @@

from bimmer_connected.api.authentication import MyBMWAuthentication
from bimmer_connected.api.regions import get_app_version, get_server_url
from bimmer_connected.api.utils import anonymize_response, get_correlation_id
from bimmer_connected.api.utils import anonymize_response, get_correlation_id, handle_httpstatuserror
from bimmer_connected.const import HTTPX_TIMEOUT, USER_AGENT, X_USER_AGENT, CarBrands
from bimmer_connected.models import AnonymizedResponse, GPSPosition

_LOGGER = logging.getLogger(__name__)

RESPONSE_STORE: Deque[AnonymizedResponse] = deque(maxlen=10)


Expand Down Expand Up @@ -62,11 +65,13 @@ async def log_response(response: httpx.Response):
async def raise_for_status_event_handler(response: httpx.Response):
"""Event handler that automatically raises HTTPStatusErrors when attached.
Will only raise on 4xx/5xx errors (but not 401!) and not raise on 3xx.
Will only raise on 4xx/5xx errors but not 401/429 which are handled `self.auth`.
"""
if response.is_error and response.status_code not in [401, 429]:
await response.aread()
response.raise_for_status()
try:
response.raise_for_status()
except httpx.HTTPStatusError as ex:
await handle_httpstatuserror(ex, log_handler=_LOGGER)

kwargs["event_hooks"]["response"].append(raise_for_status_event_handler)

Expand Down
Loading

0 comments on commit 1a0e421

Please sign in to comment.