From e0076ad9bbf13bcf287b834501470330377f403e Mon Sep 17 00:00:00 2001 From: Richard Kroegel <42204099+rikroe@users.noreply.github.com> Date: Sun, 1 Dec 2024 15:42:47 +0100 Subject: [PATCH] Reset session_id after 14 days (#699) * Reset session_id after 14 days * Update docs * Fix mypy --- bimmer_connected/cli.py | 56 ++++++++++++++------ bimmer_connected/tests/test_cli.py | 83 ++++++++++++++++++++++++++++-- docs/source/captcha.rst | 3 +- 3 files changed, 122 insertions(+), 20 deletions(-) diff --git a/bimmer_connected/cli.py b/bimmer_connected/cli.py index 809e253..8d41e2e 100644 --- a/bimmer_connected/cli.py +++ b/bimmer_connected/cli.py @@ -3,12 +3,12 @@ import argparse import asyncio -import contextlib import json import logging import sys import time from pathlib import Path +from typing import Dict, Optional import httpx @@ -325,6 +325,44 @@ def _add_position_arguments(parser: argparse.ArgumentParser): parser.set_defaults(func=get_status) +def load_oauth_store_from_file(oauth_store: Path, account: MyBMWAccount) -> Dict: + """Load the OAuth details from a file if it exists.""" + if not oauth_store.exists(): + return {} + try: + oauth_data = json.loads(oauth_store.read_text()) + except json.JSONDecodeError: + return {} + + session_id_timestamp = oauth_data.pop("session_id_timestamp", None) + # Pop session_id every 14 days to it gets recreated + if (time.time() - (session_id_timestamp or 0)) > 14 * 24 * 60 * 60: + oauth_data.pop("session_id", None) + session_id_timestamp = None + + account.set_refresh_token(**oauth_data) + + return {**oauth_data, "session_id_timestamp": session_id_timestamp} + + +def store_oauth_store_to_file( + oauth_store: Path, account: MyBMWAccount, session_id_timestamp: Optional[float] = None +) -> None: + """Store the OAuth details to a file.""" + oauth_store.parent.mkdir(parents=True, exist_ok=True) + oauth_store.write_text( + json.dumps( + { + "refresh_token": account.config.authentication.refresh_token, + "gcid": account.config.authentication.gcid, + "access_token": account.config.authentication.access_token, + "session_id": account.config.authentication.session_id, + "session_id_timestamp": session_id_timestamp or time.time(), + } + ), + ) + + def main(): """Get arguments from parser and run function in event loop.""" parser = main_parser() @@ -338,9 +376,7 @@ def main(): args.username, args.password, get_region_from_name(args.region), hcaptcha_token=args.captcha_token ) - if args.oauth_store.exists(): - with contextlib.suppress(json.JSONDecodeError): - account.set_refresh_token(**json.loads(args.oauth_store.read_text())) + oauth_store_data = load_oauth_store_from_file(args.oauth_store, account) loop = asyncio.get_event_loop() try: @@ -351,17 +387,7 @@ def main(): finally: # Ensure that the OAuth2 tokens are stored even if an exception occurred if not args.disable_oauth_store: - args.oauth_store.parent.mkdir(parents=True, exist_ok=True) - args.oauth_store.write_text( - json.dumps( - { - "refresh_token": account.config.authentication.refresh_token, - "gcid": account.config.authentication.gcid, - "access_token": account.config.authentication.access_token, - "session_id": account.config.authentication.session_id, - } - ), - ) + store_oauth_store_to_file(args.oauth_store, account, oauth_store_data.get("session_id_timestamp")) if __name__ == "__main__": diff --git a/bimmer_connected/tests/test_cli.py b/bimmer_connected/tests/test_cli.py index 3a3ed89..d3ab5a6 100644 --- a/bimmer_connected/tests/test_cli.py +++ b/bimmer_connected/tests/test_cli.py @@ -2,11 +2,13 @@ import json import subprocess import sys +import time from pathlib import Path import httpx import pytest import respx +import time_machine import bimmer_connected.cli from bimmer_connected import __version__ as VERSION @@ -153,10 +155,10 @@ def test_oauth_store_credentials(cli_home_dir: Path, bmw_fixture: respx.Router): assert (cli_home_dir / ".bimmer_connected.json").exists() is True oauth_storage = json.loads((cli_home_dir / ".bimmer_connected.json").read_text()) - assert set(oauth_storage.keys()) == {"access_token", "refresh_token", "gcid", "session_id"} + assert set(oauth_storage.keys()) == {"access_token", "refresh_token", "gcid", "session_id", "session_id_timestamp"} -# @pytest.mark.usefixtures("bmw_fixture") +@time_machine.travel("2021-11-28 21:28:59 +0000") @pytest.mark.usefixtures("cli_home_dir") def test_oauth_load_credentials(cli_home_dir: Path, bmw_fixture: respx.Router): """Test loading and storing the oauth credentials.""" @@ -166,6 +168,7 @@ def test_oauth_load_credentials(cli_home_dir: Path, bmw_fixture: respx.Router): "refresh_token": "demo_refresh_token", "gcid": "demo_gcid", "session_id": "demo_session_id", + "session_id_timestamp": 1638134000, } (cli_home_dir / ".bimmer_connected.json").write_text(json.dumps(demo_oauth_data)) @@ -182,7 +185,7 @@ def test_oauth_load_credentials(cli_home_dir: Path, bmw_fixture: respx.Router): assert (cli_home_dir / ".bimmer_connected.json").exists() is True oauth_storage = json.loads((cli_home_dir / ".bimmer_connected.json").read_text()) - assert set(oauth_storage.keys()) == {"access_token", "refresh_token", "gcid", "session_id"} + assert set(oauth_storage.keys()) == {"access_token", "refresh_token", "gcid", "session_id", "session_id_timestamp"} # no change as the old tokens are still valid assert oauth_storage["refresh_token"] == demo_oauth_data["refresh_token"] @@ -191,6 +194,78 @@ def test_oauth_load_credentials(cli_home_dir: Path, bmw_fixture: respx.Router): assert oauth_storage["session_id"] == demo_oauth_data["session_id"] +@time_machine.travel("2021-11-28 21:28:59 +0000") +@pytest.mark.usefixtures("cli_home_dir") +def test_oauth_load_credentials_old_session_id(cli_home_dir: Path, bmw_fixture: respx.Router): + """Test loading and storing the oauth credentials and getting a new session_id.""" + + demo_oauth_data = { + "access_token": "demo_access_token", + "refresh_token": "demo_refresh_token", + "gcid": "demo_gcid", + "session_id": "demo_session_id", + "session_id_timestamp": 1636838939, # 2021-11-13 21:28:59 +0000 + } + + (cli_home_dir / ".bimmer_connected.json").write_text(json.dumps(demo_oauth_data)) + assert (cli_home_dir / ".bimmer_connected.json").exists() is True + + sys.argv = ["bimmerconnected", "status", *ARGS_USER_PW_REGION] + + bimmer_connected.cli.main() + + assert (cli_home_dir / ".bimmer_connected.json").exists() is True + oauth_storage = json.loads((cli_home_dir / ".bimmer_connected.json").read_text()) + + # no change as the old tokens are still valid + assert oauth_storage["refresh_token"] == demo_oauth_data["refresh_token"] + assert oauth_storage["access_token"] == demo_oauth_data["access_token"] + assert oauth_storage["gcid"] == demo_oauth_data["gcid"] + # but we have a new session_id and session_id_timestamp + assert oauth_storage["session_id"] != demo_oauth_data["session_id"] + assert oauth_storage["session_id_timestamp"] == pytest.approx(time.time(), abs=5) + + +@time_machine.travel("2021-11-28 21:28:59 +0000") +@pytest.mark.usefixtures("cli_home_dir") +def test_oauth_store_credentials_on_error(cli_home_dir: Path, bmw_fixture: respx.Router): + """Test loading and storing the oauth credentials, even if a call errors out.""" + + demo_oauth_data = { + "access_token": "demo_access_token", + "refresh_token": "demo_refresh_token", + "gcid": "DUMMY", + "session_id": "demo_session_id", + "session_id_timestamp": 1638134000, + } + + (cli_home_dir / ".bimmer_connected.json").write_text(json.dumps(demo_oauth_data)) + assert (cli_home_dir / ".bimmer_connected.json").exists() is True + + vehicle_routes = bmw_fixture.pop("vehicles") + bmw_fixture.post("/eadrax-vcs/v5/vehicle-list", name="vehicles").mock( + side_effect=[ + httpx.Response(401, json=load_response(RESPONSE_DIR / "auth" / "auth_error_wrong_password.json")), + vehicle_routes.side_effect, # type: ignore[list-item] + httpx.Response(500), + ] + ) + + sys.argv = ["bimmerconnected", "--debug", "status", *ARGS_USER_PW_REGION] + with pytest.raises(SystemExit): + bimmer_connected.cli.main() + + assert bmw_fixture.routes["token"].call_count == 1 + + # Check that tokens are stored and a new refresh_token is saved + assert (cli_home_dir / ".bimmer_connected.json").exists() is True + oauth_storage = json.loads((cli_home_dir / ".bimmer_connected.json").read_text()) + assert oauth_storage["refresh_token"] == "another_token_string" + assert oauth_storage["access_token"] == "some_token_string" + assert oauth_storage["gcid"] == demo_oauth_data["gcid"] + assert oauth_storage["session_id"] == demo_oauth_data["session_id"] + + @pytest.mark.usefixtures("bmw_fixture") @pytest.mark.usefixtures("cli_home_dir") @pytest.mark.parametrize( @@ -222,7 +297,7 @@ def test_oauth_store_credentials_path(cli_home_dir: Path, tmp_path_factory: pyte oauth_storage = json.loads((new_folder / filepath).read_text()) - assert set(oauth_storage.keys()) == {"access_token", "refresh_token", "gcid", "session_id"} + assert set(oauth_storage.keys()) == {"access_token", "refresh_token", "gcid", "session_id", "session_id_timestamp"} @pytest.mark.usefixtures("bmw_fixture") diff --git a/docs/source/captcha.rst b/docs/source/captcha.rst index eb6b5da..9301e23 100644 --- a/docs/source/captcha.rst +++ b/docs/source/captcha.rst @@ -60,7 +60,8 @@ Using the Python API 3. **Subsequent Logins**: Ensure to keep the current :code:`MyBMWAccount` instance in memory to avoid having to solve the captcha again. - For storing the data across restarts, an example implementation can be found in `bimmerconnected.main() `_ (with :code:`args.oauth_store` being a :code:`pathlib.Path()` object). + For storing the data across restarts, an example implementation can be found in :code:`bimmerconnected.cli.main()` with + :code:`load_oauth_store_from_file()` and :code:`store_oauth_store_to_file()`. If you are running this script inside another system (e.g. domoticz), you can also store and read the information using their native tools - it does not have to be a JSON file, as long as the data is stored and read correctly.