diff --git a/bimmer_connected/api/authentication.py b/bimmer_connected/api/authentication.py index 5ed6c65c..4c01dbab 100644 --- a/bimmer_connected/api/authentication.py +++ b/bimmer_connected/api/authentication.py @@ -11,18 +11,20 @@ import httpx import jwt -from Crypto.Cipher import AES, PKCS1_v1_5 +from Crypto.Cipher import PKCS1_v1_5 from Crypto.PublicKey import RSA -from Crypto.Util.Padding import pad -from bimmer_connected.api.regions import Regions, get_aes_keys, get_app_version, get_ocp_apim_key, get_server_url +from bimmer_connected.api.regions import Regions, get_app_version, get_ocp_apim_key, get_server_url from bimmer_connected.api.utils import ( create_s256_code_challenge, + generate_cn_nonce, generate_token, get_correlation_id, handle_httpstatuserror, ) from bimmer_connected.const import ( + AUTH_CHINA_CAPTCHA_CHECK_URL, + AUTH_CHINA_CAPTCHA_URL, AUTH_CHINA_LOGIN_URL, AUTH_CHINA_PUBLIC_KEY_URL, AUTH_CHINA_TOKEN_URL, @@ -49,6 +51,7 @@ def __init__( access_token: Optional[str] = None, expires_at: Optional[datetime.datetime] = None, refresh_token: Optional[str] = None, + gcid: Optional[str] = None, ): self.username: str = username self.password: str = password @@ -58,6 +61,7 @@ def __init__( self.refresh_token: Optional[str] = refresh_token self.session_id: str = str(uuid4()) self._lock: Optional[asyncio.Lock] = None + self.gcid: Optional[str] = gcid @property def login_lock(self) -> asyncio.Lock: @@ -121,6 +125,7 @@ async def login(self) -> None: if not token_data: token_data = await self._login_china() token_data["expires_at"] = token_data["expires_at"] - EXPIRES_AT_OFFSET + self.gcid = token_data["gcid"] self.access_token = token_data["access_token"] self.expires_at = token_data["expires_at"] @@ -270,14 +275,33 @@ async def _login_china(self): encrypted = cipher_rsa.encrypt(self.password.encode()) pw_encrypted = base64.b64encode(encrypted).decode("UTF-8") - cipher_aes = AES.new(**get_aes_keys(self.region), mode=AES.MODE_CBC) - nonce = f"{self.username}|{datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%fZ')}".encode() + captcha_res = await client.post( + AUTH_CHINA_CAPTCHA_URL, + json={"mobile": self.username}, + ) + verify_id = captcha_res.json()["data"]["verifyId"] + + for i in range(1, 13): + try: + captcha_check_res = await client.post( + AUTH_CHINA_CAPTCHA_CHECK_URL, + json={"position": 0.74 + i / 100, "verifyId": verify_id}, + ) + if captcha_check_res.status_code == 201: + break + except MyBMWAPIError: + continue # Get token response = await client.post( AUTH_CHINA_LOGIN_URL, - headers={"x-login-nonce": base64.b64encode(cipher_aes.encrypt(pad(nonce, 16))).decode()}, - json={"mobile": self.username, "password": pw_encrypted}, + headers={"x-login-nonce": generate_cn_nonce(self.username)}, + json={ + "mobile": self.username, + "password": pw_encrypted, + "verifyId": verify_id, + "deviceId": self.username, + }, ) response_json = response.json()["data"] @@ -289,6 +313,7 @@ async def _login_china(self): "access_token": response_json["access_token"], "expires_at": datetime.datetime.utcfromtimestamp(decoded_token["exp"]), "refresh_token": response_json["refresh_token"], + "gcid": response_json["gcid"], } async def _refresh_token_china(self): @@ -301,6 +326,7 @@ async def _refresh_token_china(self): # Try logging in using refresh_token response = await client.post( AUTH_CHINA_TOKEN_URL, + headers={"x-login-nonce": generate_cn_nonce(self.gcid)}, data={ "refresh_token": self.refresh_token, "grant_type": "refresh_token", @@ -319,6 +345,7 @@ async def _refresh_token_china(self): "access_token": response_json["access_token"], "expires_at": expires_at, "refresh_token": response_json["refresh_token"], + "gcid": response_json["gcid"], } diff --git a/bimmer_connected/api/regions.py b/bimmer_connected/api/regions.py index e2918e57..82425548 100644 --- a/bimmer_connected/api/regions.py +++ b/bimmer_connected/api/regions.py @@ -1,8 +1,8 @@ """Get the right url for the different countries.""" from base64 import b64decode -from typing import Dict, List +from typing import List -from bimmer_connected.const import AES_KEYS, APP_VERSIONS, OCP_APIM_KEYS, SERVER_URLS_MYBMW, Regions +from bimmer_connected.const import APP_VERSIONS, OCP_APIM_KEYS, SERVER_URLS_MYBMW, Regions def valid_regions() -> List[str]: @@ -34,8 +34,3 @@ def get_app_version(region: Regions) -> str: def get_ocp_apim_key(region: Regions) -> str: """Get the authorization for OAuth settings.""" return b64decode(OCP_APIM_KEYS[region]).decode() - - -def get_aes_keys(region: Regions) -> Dict[str, bytes]: - """Get the keys for login nonce.""" - return {k: b64decode(v) for k, v in AES_KEYS[region].items()} diff --git a/bimmer_connected/api/utils.py b/bimmer_connected/api/utils.py index 34a94154..93de5f63 100644 --- a/bimmer_connected/api/utils.py +++ b/bimmer_connected/api/utils.py @@ -1,6 +1,7 @@ """Utils for bimmer_connected.api.""" import base64 +import datetime import hashlib import json import logging @@ -12,6 +13,9 @@ from uuid import uuid4 import httpx +from Crypto.Cipher import AES +from Crypto.Hash import SHA256 +from Crypto.Util.Padding import pad from bimmer_connected.models import AnonymizedResponse, MyBMWAPIError, MyBMWAuthError @@ -137,3 +141,34 @@ def anonymize_response(response: httpx.Response) -> AnonymizedResponse: file_extension = mimetypes.guess_extension(content_type or ".txt") return AnonymizedResponse(f"{brand}{url_path}{file_extension}", content) + + +def generate_random_base64_string(size: int) -> str: + """Generate a random base64 string with size.""" + return base64.b64encode(bytes(random.randint(0, 255) for _ in range(size))).decode()[:size] + + +def generate_cn_nonce(username: str) -> str: + """Generate a x-login-nonce string.""" + key = generate_random_base64_string(16) + iv = generate_random_base64_string(16) + + k1 = key[:8] + i1 = iv[:8] + k2 = key[8:] + i2 = iv[8:] + + sha256_hex = SHA256.new((k1 + i1 + "u3.1.0" + k2 + i2).encode()).hexdigest() + sha256_a = sha256_hex[:32] + sha256_b = sha256_hex[32:] + + possible_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + random_str = "".join(random.choice(possible_chars) for _ in range(8)) + + phone_text = f"{username}&{datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')}&{random_str}" + + cipher_aes = AES.new(key.encode(), AES.MODE_CBC, iv.encode()) + + aes_hex = cipher_aes.encrypt(pad(phone_text.encode(), AES.block_size)).hex() + + return k1 + i1 + sha256_a + aes_hex + k2 + i2 + sha256_b diff --git a/bimmer_connected/const.py b/bimmer_connected/const.py index 021f3c73..27995bdb 100644 --- a/bimmer_connected/const.py +++ b/bimmer_connected/const.py @@ -36,13 +36,6 @@ class Regions(str, Enum): Regions.REST_OF_WORLD: "NGYxYzg1YTMtNzU4Zi1hMzdkLWJiYjYtZjg3MDQ0OTRhY2Zh", } -AES_KEYS = { - Regions.CHINA: { - "key": "UzJUdzEwdlExWGYySmxLYQ==", - "iv": "dTFGUDd4ZWRrQWhMR3ozVQ==", - } -} - APP_VERSIONS = { Regions.NORTH_AMERICA: "3.3.1(22418)", Regions.REST_OF_WORLD: "3.3.1(22418)", @@ -58,6 +51,8 @@ class Regions(str, Enum): AUTH_CHINA_PUBLIC_KEY_URL = "/eadrax-coas/v1/cop/publickey" AUTH_CHINA_LOGIN_URL = "/eadrax-coas/v2/login/pwd" AUTH_CHINA_TOKEN_URL = "/eadrax-coas/v2/oauth/token" +AUTH_CHINA_CAPTCHA_URL = "/eadrax-coas/v2/cop/slider-captcha" +AUTH_CHINA_CAPTCHA_CHECK_URL = "/eadrax-coas/v1/cop/check-captcha" OAUTH_CONFIG_URL = "/eadrax-ucs/v1/presentation/oauth/config" diff --git a/test/responses/auth/auth_cn_login_pwd.json b/test/responses/auth/auth_cn_login_pwd.json index e9d53985..52958a79 100644 --- a/test/responses/auth/auth_cn_login_pwd.json +++ b/test/responses/auth/auth_cn_login_pwd.json @@ -4,9 +4,10 @@ "token_type": "Bearer", "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJqdGkiOiJEVU1NWSQxJFIkMTYzNzcwNTc5NTA3NSIsIm5iZiI6MTYzNzcwNTc5NSwiZXhwIjoxNjQ1NDgwODk1LCJpYXQiOjE2Mzc3MDU3OTV9.dGVpbpfrJOo895jiU6Rk16ESYz80klfJbIX9M4KD1hQ", "usid": "DUMMY", - "cid": "DUMMY" + "cid": "DUMMY", + "gcid": "DUMMY" }, "code": 200, "error": false, "description": "ok" -} \ No newline at end of file +} diff --git a/test/responses/auth/auth_slider_captcha.json b/test/responses/auth/auth_slider_captcha.json new file mode 100644 index 00000000..9fd05d7a --- /dev/null +++ b/test/responses/auth/auth_slider_captcha.json @@ -0,0 +1,10 @@ +{ + "description": "ok", + "code": 200, + "error": false, + "data": { + "verifyId": "f40f54f1cb40463ba7188bd73e906d33", + "backGroundImg": "/xxxx", + "cutImg": "xxxx" + } +} diff --git a/test/responses/auth/auth_slider_captcha_check.json b/test/responses/auth/auth_slider_captcha_check.json new file mode 100644 index 00000000..2db34946 --- /dev/null +++ b/test/responses/auth/auth_slider_captcha_check.json @@ -0,0 +1,6 @@ +{ + "description": "ok", + "code": 200, + "error": false, + "data": null +} diff --git a/test/responses/auth/auth_token.json b/test/responses/auth/auth_token.json index 477632a0..d316251c 100644 --- a/test/responses/auth/auth_token.json +++ b/test/responses/auth/auth_token.json @@ -3,5 +3,7 @@ "token_type": "Bearer", "expires_in": 28799, "refresh_token": "another_token_string", - "scope": "authenticate_user vehicle_data remote_services" -} \ No newline at end of file + "scope": "authenticate_user vehicle_data remote_services", + "usid": "DUMMY", + "gcid": "DUMMY" +} diff --git a/test/test_account.py b/test/test_account.py index c7601069..fe86815f 100644 --- a/test/test_account.py +++ b/test/test_account.py @@ -107,6 +107,17 @@ def account_mock(): router.get("/eadrax-coas/v1/cop/publickey").respond( 200, json=load_response(RESPONSE_DIR / "auth" / "auth_cn_publickey.json") ) + router.post("/eadrax-coas/v2/cop/slider-captcha").respond( + 200, json=load_response(RESPONSE_DIR / "auth" / "auth_slider_captcha.json") + ) + + router.post("/eadrax-coas/v1/cop/check-captcha").mock( + side_effect=[ + httpx.Response(422), + httpx.Response(201, json=load_response(RESPONSE_DIR / "auth" / "auth_slider_captcha_check.json")), + ] + ) + router.post("/eadrax-coas/v2/login/pwd").respond( 200, json=load_response(RESPONSE_DIR / "auth" / "auth_cn_login_pwd.json") )