Skip to content

Commit f99a234

Browse files
authored
Merge pull request #3 from MrPitonych/feature/updated-pydantic-to-v2
2 parents eff4c57 + 66631f5 commit f99a234

File tree

6 files changed

+619
-581
lines changed

6 files changed

+619
-581
lines changed

poetry.lock

+578-554
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+18-18
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,27 @@ keywords = ["taskiq", "tasks", "distributed", "async", "kafka", "aiokafka"]
2424
packages = [{ include = "taskiq_aio_kafka" }]
2525

2626
[tool.poetry.dependencies]
27-
python = "^3.8.1"
27+
python = "^3.9"
2828
taskiq = "^0"
29-
aiokafka = "^0.8.0"
30-
pydantic = "^1.10.7"
29+
aiokafka = "^0.10.0"
30+
kafka-python = "^2.0.2"
3131

3232
[tool.poetry.group.dev.dependencies]
33-
pytest = "^7.1.2"
34-
flake8 = "^6"
35-
isort = "^5.10.1"
36-
mypy = "^1.2.0"
37-
pre-commit = "^2.20.0"
38-
yesqa = "^1.3.0"
39-
autoflake = "^1.4"
40-
wemake-python-styleguide = "^0.18.0"
41-
coverage = "^6.4.2"
42-
pytest-cov = "^3.0.0"
43-
mock = "^4.0.3"
44-
anyio = "^3.6.1"
45-
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
46-
types-mock = "^4.0.15"
47-
black = "^23.1.0"
33+
pytest = "^8.2.2"
34+
flake8 = "^7"
35+
isort = "^5.13.2"
36+
mypy = "^1.10.0"
37+
pre-commit = "^3.7.1"
38+
yesqa = "^1.5.0"
39+
autoflake = "^2.3.1"
40+
wemake-python-styleguide = "^0.19.2"
41+
coverage = "^7.5.3"
42+
pytest-cov = "^5.0.0"
43+
mock = "^5.1.0"
44+
anyio = "^4.4.0"
45+
pytest-xdist = { version = "^3.6.1", extras = ["psutil"] }
46+
types-mock = "^5.1.0"
47+
black = "^24.4.2"
4848

4949
[tool.mypy]
5050
strict = true

taskiq_aio_kafka/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Taskiq integration with aiokafka."""
2+
23
from taskiq_aio_kafka.broker import AioKafkaBroker
34

45
__all__ = ["AioKafkaBroker"]

taskiq_aio_kafka/broker.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
66
from kafka.admin import KafkaAdminClient, NewTopic
7+
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
8+
from kafka.partitioner.default import DefaultPartitioner
79
from taskiq import AsyncResultBackend, BrokerMessage
810
from taskiq.abc.broker import AsyncBroker
11+
from taskiq.compat import model_dump
912

1013
from taskiq_aio_kafka.exceptions import WrongAioKafkaBrokerParametersError
1114
from taskiq_aio_kafka.models import KafkaConsumerParameters, KafkaProducerParameters
@@ -146,19 +149,31 @@ async def startup(self) -> None:
146149
new_topics=[self._kafka_topic],
147150
validate_only=False,
148151
)
152+
153+
partitioner = self._aiokafka_producer_params.partitioner or DefaultPartitioner()
154+
producer_kwargs = model_dump(self._aiokafka_producer_params)
155+
producer_kwargs["partitioner"] = partitioner
149156
self._aiokafka_producer = AIOKafkaProducer(
150157
bootstrap_servers=self._bootstrap_servers,
151158
loop=self._loop,
152-
**self._aiokafka_producer_params.dict(),
159+
**producer_kwargs,
153160
)
154161
await self._aiokafka_producer.start()
155162

156163
if self.is_worker_process:
164+
partition_assignment_strategy = (
165+
self._aiokafka_consumer_params.partition_assignment_strategy
166+
or (RoundRobinPartitionAssignor,)
167+
)
168+
consumer_kwargs = model_dump(self._aiokafka_consumer_params)
169+
consumer_kwargs["partition_assignment_strategy"] = (
170+
partition_assignment_strategy
171+
)
157172
self._aiokafka_consumer = AIOKafkaConsumer(
158173
self._kafka_topic.name,
159174
bootstrap_servers=self._bootstrap_servers,
160175
loop=self._loop,
161-
**self._aiokafka_consumer_params.dict(),
176+
**consumer_kwargs,
162177
)
163178

164179
await self._aiokafka_consumer.start()

taskiq_aio_kafka/models.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from typing import Any, Callable, Optional, Union
22

33
from aiokafka import __version__
4-
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
5-
from kafka.partitioner.default import DefaultPartitioner
64
from pydantic import BaseModel
75

86

@@ -18,7 +16,7 @@ class KafkaProducerParameters(BaseModel):
1816
value_serializer: Optional[Callable[..., bytes]] = None
1917
compression_type: Optional[str] = None
2018
max_batch_size: int = 16384
21-
partitioner: Callable[..., Any] = DefaultPartitioner()
19+
partitioner: Optional[Callable[..., Any]] = None
2220
max_request_size: int = 1048576
2321
linger_ms: int = 0
2422
send_backoff_ms: int = 100
@@ -55,7 +53,7 @@ class KafkaConsumerParameters(BaseModel):
5553
auto_commit_interval_ms: int = 5000
5654
check_crcs: bool = True
5755
metadata_max_age_ms: int = 5 * 60 * 1000
58-
partition_assignment_strategy: Any = (RoundRobinPartitionAssignor,)
56+
partition_assignment_strategy: Any = None
5957
max_poll_interval_ms: int = 300000
6058
rebalance_timeout_ms: Optional[int] = None
6159
session_timeout_ms: int = 10000

tests/test_broker.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ async def test_startup(
6969
assert broker_without_arguments._aiokafka_producer
7070
assert broker_without_arguments._kafka_admin_client
7171

72-
all_kafka_topics: List[
73-
str
74-
] = broker_without_arguments._kafka_admin_client.list_topics()
72+
all_kafka_topics: List[str] = (
73+
broker_without_arguments._kafka_admin_client.list_topics()
74+
)
7575

7676
assert broker_without_arguments._kafka_topic.name in all_kafka_topics
7777

0 commit comments

Comments
 (0)