Skip to content

Commit

Permalink
Merge branch 'release/0.2.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Jun 17, 2024
2 parents 2238311 + dd73627 commit 3074403
Show file tree
Hide file tree
Showing 6 changed files with 620 additions and 582 deletions.
1,132 changes: 578 additions & 554 deletions poetry.lock

Large diffs are not rendered by default.

38 changes: 19 additions & 19 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "taskiq-aio-kafka"
description = "Kafka broker for taskiq"
authors = ["Taskiq team <taskiq@no-reply.com>"]
maintainers = ["Taskiq team <taskiq@no-reply.com>"]
version = "0.2.0"
version = "0.2.1"
readme = "README.md"
license = "LICENSE"
classifiers = [
Expand All @@ -24,27 +24,27 @@ keywords = ["taskiq", "tasks", "distributed", "async", "kafka", "aiokafka"]
packages = [{ include = "taskiq_aio_kafka" }]

[tool.poetry.dependencies]
python = "^3.8.1"
python = "^3.9"
taskiq = "^0"
aiokafka = "^0.8.0"
pydantic = "^1.10.7"
aiokafka = "^0.10.0"
kafka-python = "^2.0.2"

[tool.poetry.group.dev.dependencies]
pytest = "^7.1.2"
flake8 = "^6"
isort = "^5.10.1"
mypy = "^1.2.0"
pre-commit = "^2.20.0"
yesqa = "^1.3.0"
autoflake = "^1.4"
wemake-python-styleguide = "^0.18.0"
coverage = "^6.4.2"
pytest-cov = "^3.0.0"
mock = "^4.0.3"
anyio = "^3.6.1"
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
types-mock = "^4.0.15"
black = "^23.1.0"
pytest = "^8.2.2"
flake8 = "^7"
isort = "^5.13.2"
mypy = "^1.10.0"
pre-commit = "^3.7.1"
yesqa = "^1.5.0"
autoflake = "^2.3.1"
wemake-python-styleguide = "^0.19.2"
coverage = "^7.5.3"
pytest-cov = "^5.0.0"
mock = "^5.1.0"
anyio = "^4.4.0"
pytest-xdist = { version = "^3.6.1", extras = ["psutil"] }
types-mock = "^5.1.0"
black = "^24.4.2"

[tool.mypy]
strict = true
Expand Down
1 change: 1 addition & 0 deletions taskiq_aio_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Taskiq integration with aiokafka."""

from taskiq_aio_kafka.broker import AioKafkaBroker

__all__ = ["AioKafkaBroker"]
19 changes: 17 additions & 2 deletions taskiq_aio_kafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.partitioner.default import DefaultPartitioner
from taskiq import AsyncResultBackend, BrokerMessage
from taskiq.abc.broker import AsyncBroker
from taskiq.compat import model_dump

from taskiq_aio_kafka.exceptions import WrongAioKafkaBrokerParametersError
from taskiq_aio_kafka.models import KafkaConsumerParameters, KafkaProducerParameters
Expand Down Expand Up @@ -146,19 +149,31 @@ async def startup(self) -> None:
new_topics=[self._kafka_topic],
validate_only=False,
)

partitioner = self._aiokafka_producer_params.partitioner or DefaultPartitioner()
producer_kwargs = model_dump(self._aiokafka_producer_params)
producer_kwargs["partitioner"] = partitioner
self._aiokafka_producer = AIOKafkaProducer(
bootstrap_servers=self._bootstrap_servers,
loop=self._loop,
**self._aiokafka_producer_params.dict(),
**producer_kwargs,
)
await self._aiokafka_producer.start()

if self.is_worker_process:
partition_assignment_strategy = (
self._aiokafka_consumer_params.partition_assignment_strategy
or (RoundRobinPartitionAssignor,)
)
consumer_kwargs = model_dump(self._aiokafka_consumer_params)
consumer_kwargs["partition_assignment_strategy"] = (
partition_assignment_strategy
)
self._aiokafka_consumer = AIOKafkaConsumer(
self._kafka_topic.name,
bootstrap_servers=self._bootstrap_servers,
loop=self._loop,
**self._aiokafka_consumer_params.dict(),
**consumer_kwargs,
)

await self._aiokafka_consumer.start()
Expand Down
6 changes: 2 additions & 4 deletions taskiq_aio_kafka/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Any, Callable, Optional, Union

from aiokafka import __version__
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.partitioner.default import DefaultPartitioner
from pydantic import BaseModel


Expand All @@ -18,7 +16,7 @@ class KafkaProducerParameters(BaseModel):
value_serializer: Optional[Callable[..., bytes]] = None
compression_type: Optional[str] = None
max_batch_size: int = 16384
partitioner: Callable[..., Any] = DefaultPartitioner()
partitioner: Optional[Callable[..., Any]] = None
max_request_size: int = 1048576
linger_ms: int = 0
send_backoff_ms: int = 100
Expand Down Expand Up @@ -55,7 +53,7 @@ class KafkaConsumerParameters(BaseModel):
auto_commit_interval_ms: int = 5000
check_crcs: bool = True
metadata_max_age_ms: int = 5 * 60 * 1000
partition_assignment_strategy: Any = (RoundRobinPartitionAssignor,)
partition_assignment_strategy: Any = None
max_poll_interval_ms: int = 300000
rebalance_timeout_ms: Optional[int] = None
session_timeout_ms: int = 10000
Expand Down
6 changes: 3 additions & 3 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ async def test_startup(
assert broker_without_arguments._aiokafka_producer
assert broker_without_arguments._kafka_admin_client

all_kafka_topics: List[
str
] = broker_without_arguments._kafka_admin_client.list_topics()
all_kafka_topics: List[str] = (
broker_without_arguments._kafka_admin_client.list_topics()
)

assert broker_without_arguments._kafka_topic.name in all_kafka_topics

Expand Down

0 comments on commit 3074403

Please sign in to comment.