diff --git a/CHANGELOG b/CHANGELOG index 6ee0d360..c4198520 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `login` command re-enabled. It should now behave consistently regardless of configuration and environment. - `login` command description updated to reflect its intended usage. +- Now uses new header-based authentication for Zabbix >=6.4 instead of passing it as a part of the request body. + +### Fixed + +- Authentication for Zabbix 7.2 and later. The application now correctly determines how to pass in authentication data based on the Zabbix version. ## [3.4.1](https://github.com/unioslo/zabbix-cli/releases/tag/3.4.1) - 2024-12-04 diff --git a/tests/conftest.py b/tests/conftest.py index 7e281dc0..8621e863 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ import pytest import typer +from packaging.version import Version from typer.testing import CliRunner from zabbix_cli.app import StatefulApp from zabbix_cli.config.model import Config @@ -97,9 +98,13 @@ def config(tmp_path: Path) -> Iterator[Config]: @pytest.fixture(name="zabbix_client") -def zabbix_client() -> Iterator[ZabbixAPI]: +def zabbix_client(monkeypatch: pytest.MonkeyPatch) -> Iterator[ZabbixAPI]: config = Config.sample_config() client = ZabbixAPI.from_config(config) + + # Patch the version check + monkeypatch.setattr(client, "api_version", lambda: Version("7.0.0")) + yield client diff --git a/tests/pyzabbix/test_client.py b/tests/pyzabbix/test_client.py index f1806f59..88de90a9 100644 --- a/tests/pyzabbix/test_client.py +++ b/tests/pyzabbix/test_client.py @@ -4,7 +4,6 @@ import pytest from inline_snapshot import snapshot -from packaging.version import Version from zabbix_cli.exceptions import ZabbixAPILoginError from zabbix_cli.exceptions import ZabbixAPILogoutError from zabbix_cli.pyzabbix.client import ZabbixAPI @@ -76,15 +75,12 @@ def test_add_param(inp: Any, subkey: str, value: Any, expect: dict[str, Any]) -> # Check in-place modification -def test_login_fails(zabbix_client: ZabbixAPI, monkeypatch: pytest.MonkeyPatch) -> None: +def test_login_fails(zabbix_client: ZabbixAPI) -> None: zabbix_client.set_url("http://some-url-that-will-fail.gg") assert zabbix_client.url == snapshot( "http://some-url-that-will-fail.gg/api_jsonrpc.php" ) - # Patch the version check - monkeypatch.setattr(zabbix_client, "api_version", lambda: Version("7.0.0")) - with pytest.raises(ZabbixAPILoginError) as exc_info: zabbix_client.login(user="username", password="password") diff --git a/zabbix_cli/pyzabbix/client.py b/zabbix_cli/pyzabbix/client.py index 59d43a39..2700fd29 100644 --- a/zabbix_cli/pyzabbix/client.py +++ b/zabbix_cli/pyzabbix/client.py @@ -17,6 +17,7 @@ import logging from collections.abc import MutableMapping from datetime import datetime +from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING from typing import Any @@ -234,7 +235,7 @@ def __init__( server: str = "http://localhost/zabbix", timeout: Optional[int] = None, verify_ssl: bool = True, - ): + ) -> None: """Parameters: server: Base URI for zabbix web interface (omitting /api_jsonrpc.php) session: optional pre-configured requests.Session instance @@ -254,9 +255,6 @@ def __init__( # Cache self.cache = ZabbixCache(self) - # Attributes for properties - self._version: Optional[Version] = None - def _get_url(self, server: str) -> str: """Format a URL for the Zabbix API.""" return f"{server}/api_jsonrpc.php" @@ -344,6 +342,9 @@ def login( self.auth = str(auth) if auth else "" self.use_api_token = False + # Check if the auth token we obtained or specified is valid + # XXX: should revert auth token to what it was before this method + # was called if token is invalid. self.ensure_authenticated() return self.auth @@ -385,24 +386,19 @@ def confimport(self, format: ExportFormat, source: str, rules: ImportRules) -> A }, ).result - # TODO (pederhan): Use functools.cachedproperty when we drop 3.7 support - @property + @cached_property def version(self) -> Version: - """Alternate version of api_version() that caches version info - as a Version object. - """ - if self._version is None: - self._version = self.api_version() - return self._version + """`api_version()` exposed as a cached property.""" + return self.api_version() def api_version(self) -> Version: """Get the version of the Zabbix API as a Version object.""" try: return Version(self.apiinfo.version()) except ZabbixAPIException as e: - raise ZabbixAPIException(f"Failed to get Zabbix API version: {e}") from e + raise ZabbixAPIException("Failed to get Zabbix version from API") from e except InvalidVersion as e: - raise ZabbixAPIException(f"Invalid Zabbix API version: {e}") from e + raise ZabbixAPIException("Got invalid Zabbix version from API") from e def do_request( self, method: str, params: Optional[ParamsType] = None @@ -415,16 +411,22 @@ def do_request( "params": params, "id": self.id, } + request_headers: dict[str, str] = {} # We don't have to pass the auth token if asking for the apiinfo.version - if self.auth and method != "apiinfo.version": - request_json["auth"] = self.auth # TODO: ensure we have auth token if method requires it + if self.auth and method != "apiinfo.version": + if self.version.release >= (6, 4, 0): + request_headers["Authorization"] = f"Bearer {self.auth}" + else: + request_json["auth"] = self.auth logger.debug("Sending %s to %s", method, self.url) try: - response = self.session.post(self.url, json=request_json) + response = self.session.post( + self.url, json=request_json, headers=request_headers + ) except Exception as e: raise ZabbixAPIRequestError( f"Failed to send request to {self.url} ({method}) with params {params}", @@ -508,29 +510,6 @@ def populate_cache(self) -> None: # NOTE: Must be manually invoked. Can we do this in a thread? self.cache.populate() - # TODO: delete when we don't need to test v2 code anymore - def get_hostgroup_name(self, hostgroup_id: str) -> str: - """Returns the name of a host group given its ID.""" - hostgroup_name = self.cache.get_hostgroup_name(hostgroup_id) - if hostgroup_name: - return hostgroup_name - resp = self.hostgroup.get(filter={"groupid": hostgroup_id}, output=["name"]) - if not resp: - raise ZabbixNotFoundError(f"HostGroup with ID {hostgroup_id} not found") - # TODO add result to cache - return resp[0]["name"] - - def get_hostgroup_id(self, hostgroup_name: str) -> str: - """Returns the ID of a host group given its name.""" - hostgroup_id = self.cache.get_hostgroup_id(hostgroup_name) - if hostgroup_id: - return hostgroup_id - resp = self.hostgroup.get(filter={"name": hostgroup_name}, output=["name"]) - if not resp: - raise ZabbixNotFoundError(f"Host group {hostgroup_name!r} not found") - # TODO add result to cache - return resp[0]["groupid"] - def get_hostgroup( self, name_or_id: str,