Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Zabbix 7.2 authentication #276

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `login` command re-enabled. It should now behave consistently regardless of configuration and environment.
- `login` command description updated to reflect its intended usage.
- Now uses new header-based authentication for Zabbix >=6.4 instead of passing it as a part of the request body.

### Fixed

- Authentication for Zabbix 7.2 and later. The application now correctly determines how to pass in authentication data based on the Zabbix version.

## [3.4.1](https://github.com/unioslo/zabbix-cli/releases/tag/3.4.1) - 2024-12-04

Expand Down
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pytest
import typer
from packaging.version import Version
from typer.testing import CliRunner
from zabbix_cli.app import StatefulApp
from zabbix_cli.config.model import Config
Expand Down Expand Up @@ -97,9 +98,13 @@ def config(tmp_path: Path) -> Iterator[Config]:


@pytest.fixture(name="zabbix_client")
def zabbix_client() -> Iterator[ZabbixAPI]:
def zabbix_client(monkeypatch: pytest.MonkeyPatch) -> Iterator[ZabbixAPI]:
config = Config.sample_config()
client = ZabbixAPI.from_config(config)

# Patch the version check
monkeypatch.setattr(client, "api_version", lambda: Version("7.0.0"))

yield client


Expand Down
6 changes: 1 addition & 5 deletions tests/pyzabbix/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import pytest
from inline_snapshot import snapshot
from packaging.version import Version
from zabbix_cli.exceptions import ZabbixAPILoginError
from zabbix_cli.exceptions import ZabbixAPILogoutError
from zabbix_cli.pyzabbix.client import ZabbixAPI
Expand Down Expand Up @@ -76,15 +75,12 @@ def test_add_param(inp: Any, subkey: str, value: Any, expect: dict[str, Any]) ->
# Check in-place modification


def test_login_fails(zabbix_client: ZabbixAPI, monkeypatch: pytest.MonkeyPatch) -> None:
def test_login_fails(zabbix_client: ZabbixAPI) -> None:
zabbix_client.set_url("http://some-url-that-will-fail.gg")
assert zabbix_client.url == snapshot(
"http://some-url-that-will-fail.gg/api_jsonrpc.php"
)

# Patch the version check
monkeypatch.setattr(zabbix_client, "api_version", lambda: Version("7.0.0"))

with pytest.raises(ZabbixAPILoginError) as exc_info:
zabbix_client.login(user="username", password="password")

Expand Down
59 changes: 19 additions & 40 deletions zabbix_cli/pyzabbix/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
from collections.abc import MutableMapping
from datetime import datetime
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
Expand Down Expand Up @@ -234,7 +235,7 @@ def __init__(
server: str = "http://localhost/zabbix",
timeout: Optional[int] = None,
verify_ssl: bool = True,
):
) -> None:
"""Parameters:
server: Base URI for zabbix web interface (omitting /api_jsonrpc.php)
session: optional pre-configured requests.Session instance
Expand All @@ -254,9 +255,6 @@ def __init__(
# Cache
self.cache = ZabbixCache(self)

# Attributes for properties
self._version: Optional[Version] = None

def _get_url(self, server: str) -> str:
"""Format a URL for the Zabbix API."""
return f"{server}/api_jsonrpc.php"
Expand Down Expand Up @@ -344,6 +342,9 @@ def login(
self.auth = str(auth) if auth else ""
self.use_api_token = False

# Check if the auth token we obtained or specified is valid
# XXX: should revert auth token to what it was before this method
# was called if token is invalid.
self.ensure_authenticated()
return self.auth

Expand Down Expand Up @@ -385,24 +386,19 @@ def confimport(self, format: ExportFormat, source: str, rules: ImportRules) -> A
},
).result

# TODO (pederhan): Use functools.cachedproperty when we drop 3.7 support
@property
@cached_property
def version(self) -> Version:
"""Alternate version of api_version() that caches version info
as a Version object.
"""
if self._version is None:
self._version = self.api_version()
return self._version
"""`api_version()` exposed as a cached property."""
return self.api_version()

def api_version(self) -> Version:
"""Get the version of the Zabbix API as a Version object."""
try:
return Version(self.apiinfo.version())
except ZabbixAPIException as e:
raise ZabbixAPIException(f"Failed to get Zabbix API version: {e}") from e
raise ZabbixAPIException("Failed to get Zabbix version from API") from e
except InvalidVersion as e:
raise ZabbixAPIException(f"Invalid Zabbix API version: {e}") from e
raise ZabbixAPIException("Got invalid Zabbix version from API") from e

def do_request(
self, method: str, params: Optional[ParamsType] = None
Expand All @@ -415,16 +411,22 @@ def do_request(
"params": params,
"id": self.id,
}
request_headers: dict[str, str] = {}

# We don't have to pass the auth token if asking for the apiinfo.version
if self.auth and method != "apiinfo.version":
request_json["auth"] = self.auth
# TODO: ensure we have auth token if method requires it
if self.auth and method != "apiinfo.version":
if self.version.release >= (6, 4, 0):
request_headers["Authorization"] = f"Bearer {self.auth}"
else:
request_json["auth"] = self.auth

logger.debug("Sending %s to %s", method, self.url)

try:
response = self.session.post(self.url, json=request_json)
response = self.session.post(
self.url, json=request_json, headers=request_headers
)
except Exception as e:
raise ZabbixAPIRequestError(
f"Failed to send request to {self.url} ({method}) with params {params}",
Expand Down Expand Up @@ -508,29 +510,6 @@ def populate_cache(self) -> None:
# NOTE: Must be manually invoked. Can we do this in a thread?
self.cache.populate()

# TODO: delete when we don't need to test v2 code anymore
def get_hostgroup_name(self, hostgroup_id: str) -> str:
"""Returns the name of a host group given its ID."""
hostgroup_name = self.cache.get_hostgroup_name(hostgroup_id)
if hostgroup_name:
return hostgroup_name
resp = self.hostgroup.get(filter={"groupid": hostgroup_id}, output=["name"])
if not resp:
raise ZabbixNotFoundError(f"HostGroup with ID {hostgroup_id} not found")
# TODO add result to cache
return resp[0]["name"]

def get_hostgroup_id(self, hostgroup_name: str) -> str:
"""Returns the ID of a host group given its name."""
hostgroup_id = self.cache.get_hostgroup_id(hostgroup_name)
if hostgroup_id:
return hostgroup_id
resp = self.hostgroup.get(filter={"name": hostgroup_name}, output=["name"])
if not resp:
raise ZabbixNotFoundError(f"Host group {hostgroup_name!r} not found")
# TODO add result to cache
return resp[0]["groupid"]

def get_hostgroup(
self,
name_or_id: str,
Expand Down
Loading