Skip to content

Commit

Permalink
Fix china login (#534)
Browse files Browse the repository at this point in the history
* Changes Regions.CHINA APP_VERSIONS to 3.1.0

* Adjust china login and refresh token logic

* Add api test mock for captcha url

* fix test coverage issue

* Remove unused code

* fix lint and style issue

* optimize the captcha logic and test

* Make the check captcha status success code to 201
  • Loading branch information
Yixi authored May 16, 2023
1 parent 6a5f292 commit e9f799f
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 25 deletions.
41 changes: 34 additions & 7 deletions bimmer_connected/api/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@

import httpx
import jwt
from Crypto.Cipher import AES, PKCS1_v1_5
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import pad

from bimmer_connected.api.regions import Regions, get_aes_keys, get_app_version, get_ocp_apim_key, get_server_url
from bimmer_connected.api.regions import Regions, get_app_version, get_ocp_apim_key, get_server_url
from bimmer_connected.api.utils import (
create_s256_code_challenge,
generate_cn_nonce,
generate_token,
get_correlation_id,
handle_httpstatuserror,
)
from bimmer_connected.const import (
AUTH_CHINA_CAPTCHA_CHECK_URL,
AUTH_CHINA_CAPTCHA_URL,
AUTH_CHINA_LOGIN_URL,
AUTH_CHINA_PUBLIC_KEY_URL,
AUTH_CHINA_TOKEN_URL,
Expand All @@ -49,6 +51,7 @@ def __init__(
access_token: Optional[str] = None,
expires_at: Optional[datetime.datetime] = None,
refresh_token: Optional[str] = None,
gcid: Optional[str] = None,
):
self.username: str = username
self.password: str = password
Expand All @@ -58,6 +61,7 @@ def __init__(
self.refresh_token: Optional[str] = refresh_token
self.session_id: str = str(uuid4())
self._lock: Optional[asyncio.Lock] = None
self.gcid: Optional[str] = gcid

@property
def login_lock(self) -> asyncio.Lock:
Expand Down Expand Up @@ -121,6 +125,7 @@ async def login(self) -> None:
if not token_data:
token_data = await self._login_china()
token_data["expires_at"] = token_data["expires_at"] - EXPIRES_AT_OFFSET
self.gcid = token_data["gcid"]

self.access_token = token_data["access_token"]
self.expires_at = token_data["expires_at"]
Expand Down Expand Up @@ -270,14 +275,33 @@ async def _login_china(self):
encrypted = cipher_rsa.encrypt(self.password.encode())
pw_encrypted = base64.b64encode(encrypted).decode("UTF-8")

cipher_aes = AES.new(**get_aes_keys(self.region), mode=AES.MODE_CBC)
nonce = f"{self.username}|{datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%fZ')}".encode()
captcha_res = await client.post(
AUTH_CHINA_CAPTCHA_URL,
json={"mobile": self.username},
)
verify_id = captcha_res.json()["data"]["verifyId"]

for i in range(1, 13):
try:
captcha_check_res = await client.post(
AUTH_CHINA_CAPTCHA_CHECK_URL,
json={"position": 0.74 + i / 100, "verifyId": verify_id},
)
if captcha_check_res.status_code == 201:
break
except MyBMWAPIError:
continue

# Get token
response = await client.post(
AUTH_CHINA_LOGIN_URL,
headers={"x-login-nonce": base64.b64encode(cipher_aes.encrypt(pad(nonce, 16))).decode()},
json={"mobile": self.username, "password": pw_encrypted},
headers={"x-login-nonce": generate_cn_nonce(self.username)},
json={
"mobile": self.username,
"password": pw_encrypted,
"verifyId": verify_id,
"deviceId": self.username,
},
)
response_json = response.json()["data"]

Expand All @@ -289,6 +313,7 @@ async def _login_china(self):
"access_token": response_json["access_token"],
"expires_at": datetime.datetime.utcfromtimestamp(decoded_token["exp"]),
"refresh_token": response_json["refresh_token"],
"gcid": response_json["gcid"],
}

async def _refresh_token_china(self):
Expand All @@ -301,6 +326,7 @@ async def _refresh_token_china(self):
# Try logging in using refresh_token
response = await client.post(
AUTH_CHINA_TOKEN_URL,
headers={"x-login-nonce": generate_cn_nonce(self.gcid)},
data={
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
Expand All @@ -319,6 +345,7 @@ async def _refresh_token_china(self):
"access_token": response_json["access_token"],
"expires_at": expires_at,
"refresh_token": response_json["refresh_token"],
"gcid": response_json["gcid"],
}


Expand Down
9 changes: 2 additions & 7 deletions bimmer_connected/api/regions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Get the right url for the different countries."""
from base64 import b64decode
from typing import Dict, List
from typing import List

from bimmer_connected.const import AES_KEYS, APP_VERSIONS, OCP_APIM_KEYS, SERVER_URLS_MYBMW, Regions
from bimmer_connected.const import APP_VERSIONS, OCP_APIM_KEYS, SERVER_URLS_MYBMW, Regions


def valid_regions() -> List[str]:
Expand Down Expand Up @@ -34,8 +34,3 @@ def get_app_version(region: Regions) -> str:
def get_ocp_apim_key(region: Regions) -> str:
"""Get the authorization for OAuth settings."""
return b64decode(OCP_APIM_KEYS[region]).decode()


def get_aes_keys(region: Regions) -> Dict[str, bytes]:
"""Get the keys for login nonce."""
return {k: b64decode(v) for k, v in AES_KEYS[region].items()}
35 changes: 35 additions & 0 deletions bimmer_connected/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Utils for bimmer_connected.api."""

import base64
import datetime
import hashlib
import json
import logging
Expand All @@ -12,6 +13,9 @@
from uuid import uuid4

import httpx
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.Util.Padding import pad

from bimmer_connected.models import AnonymizedResponse, MyBMWAPIError, MyBMWAuthError

Expand Down Expand Up @@ -137,3 +141,34 @@ def anonymize_response(response: httpx.Response) -> AnonymizedResponse:
file_extension = mimetypes.guess_extension(content_type or ".txt")

return AnonymizedResponse(f"{brand}{url_path}{file_extension}", content)


def generate_random_base64_string(size: int) -> str:
"""Generate a random base64 string with size."""
return base64.b64encode(bytes(random.randint(0, 255) for _ in range(size))).decode()[:size]


def generate_cn_nonce(username: str) -> str:
"""Generate a x-login-nonce string."""
key = generate_random_base64_string(16)
iv = generate_random_base64_string(16)

k1 = key[:8]
i1 = iv[:8]
k2 = key[8:]
i2 = iv[8:]

sha256_hex = SHA256.new((k1 + i1 + "u3.1.0" + k2 + i2).encode()).hexdigest()
sha256_a = sha256_hex[:32]
sha256_b = sha256_hex[32:]

possible_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
random_str = "".join(random.choice(possible_chars) for _ in range(8))

phone_text = f"{username}&{datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')}&{random_str}"

cipher_aes = AES.new(key.encode(), AES.MODE_CBC, iv.encode())

aes_hex = cipher_aes.encrypt(pad(phone_text.encode(), AES.block_size)).hex()

return k1 + i1 + sha256_a + aes_hex + k2 + i2 + sha256_b
9 changes: 2 additions & 7 deletions bimmer_connected/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ class Regions(str, Enum):
Regions.REST_OF_WORLD: "NGYxYzg1YTMtNzU4Zi1hMzdkLWJiYjYtZjg3MDQ0OTRhY2Zh",
}

AES_KEYS = {
Regions.CHINA: {
"key": "UzJUdzEwdlExWGYySmxLYQ==",
"iv": "dTFGUDd4ZWRrQWhMR3ozVQ==",
}
}

APP_VERSIONS = {
Regions.NORTH_AMERICA: "3.3.1(22418)",
Regions.REST_OF_WORLD: "3.3.1(22418)",
Expand All @@ -58,6 +51,8 @@ class Regions(str, Enum):
AUTH_CHINA_PUBLIC_KEY_URL = "/eadrax-coas/v1/cop/publickey"
AUTH_CHINA_LOGIN_URL = "/eadrax-coas/v2/login/pwd"
AUTH_CHINA_TOKEN_URL = "/eadrax-coas/v2/oauth/token"
AUTH_CHINA_CAPTCHA_URL = "/eadrax-coas/v2/cop/slider-captcha"
AUTH_CHINA_CAPTCHA_CHECK_URL = "/eadrax-coas/v1/cop/check-captcha"

OAUTH_CONFIG_URL = "/eadrax-ucs/v1/presentation/oauth/config"

Expand Down
5 changes: 3 additions & 2 deletions test/responses/auth/auth_cn_login_pwd.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"token_type": "Bearer",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJqdGkiOiJEVU1NWSQxJFIkMTYzNzcwNTc5NTA3NSIsIm5iZiI6MTYzNzcwNTc5NSwiZXhwIjoxNjQ1NDgwODk1LCJpYXQiOjE2Mzc3MDU3OTV9.dGVpbpfrJOo895jiU6Rk16ESYz80klfJbIX9M4KD1hQ",
"usid": "DUMMY",
"cid": "DUMMY"
"cid": "DUMMY",
"gcid": "DUMMY"
},
"code": 200,
"error": false,
"description": "ok"
}
}
10 changes: 10 additions & 0 deletions test/responses/auth/auth_slider_captcha.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"description": "ok",
"code": 200,
"error": false,
"data": {
"verifyId": "f40f54f1cb40463ba7188bd73e906d33",
"backGroundImg": "/xxxx",
"cutImg": "xxxx"
}
}
6 changes: 6 additions & 0 deletions test/responses/auth/auth_slider_captcha_check.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"description": "ok",
"code": 200,
"error": false,
"data": null
}
6 changes: 4 additions & 2 deletions test/responses/auth/auth_token.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
"token_type": "Bearer",
"expires_in": 28799,
"refresh_token": "another_token_string",
"scope": "authenticate_user vehicle_data remote_services"
}
"scope": "authenticate_user vehicle_data remote_services",
"usid": "DUMMY",
"gcid": "DUMMY"
}
11 changes: 11 additions & 0 deletions test/test_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ def account_mock():
router.get("/eadrax-coas/v1/cop/publickey").respond(
200, json=load_response(RESPONSE_DIR / "auth" / "auth_cn_publickey.json")
)
router.post("/eadrax-coas/v2/cop/slider-captcha").respond(
200, json=load_response(RESPONSE_DIR / "auth" / "auth_slider_captcha.json")
)

router.post("/eadrax-coas/v1/cop/check-captcha").mock(
side_effect=[
httpx.Response(422),
httpx.Response(201, json=load_response(RESPONSE_DIR / "auth" / "auth_slider_captcha_check.json")),
]
)

router.post("/eadrax-coas/v2/login/pwd").respond(
200, json=load_response(RESPONSE_DIR / "auth" / "auth_cn_login_pwd.json")
)
Expand Down

0 comments on commit e9f799f

Please sign in to comment.