Skip to content

Commit

Permalink
Use murmur partitioner on librdkafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ifnesi committed Jun 9, 2023
1 parent 10c4f34 commit e4bd009
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 37 deletions.
8 changes: 1 addition & 7 deletions msvc_assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
get_system_config,
validate_cli_args,
log_event_received,
get_topic_partitions,
set_producer_consumer,
get_custom_partitioner,
)


Expand All @@ -60,7 +58,7 @@
CONSUME_TOPICS = [
SYS_CONFIG["kafka-topics"]["pizza_ordered"],
]
_, PRODUCER, CONSUMER, ADMIN_CLIENT = set_producer_consumer(
_, PRODUCER, CONSUMER, _ = set_producer_consumer(
kafka_config_file,
producer_extra_config={
"on_delivery": delivery_report,
Expand All @@ -71,9 +69,6 @@
"client.id": f"""{SYS_CONFIG["kafka-client-id"]["microservice_assembled"]}_{HOSTNAME}""",
},
)
CUSTOM_PARTITIONER = get_custom_partitioner()
PARTITIONS_STATUS = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_STATUS)
PARTITIONS_ASSEMBLED = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_ASSEMBLED)

# Set signal handler
GRACEFUL_SHUTDOWN = GracefulShutdown(consumer=CONSUMER)
Expand All @@ -97,7 +92,6 @@ def pizza_assembled(
"timestamp": timestamp_now(),
}
).encode(),
partition=CUSTOM_PARTITIONER(order_id.encode(), PARTITIONS_ASSEMBLED),
)
PRODUCER.flush()

Expand Down
8 changes: 1 addition & 7 deletions msvc_bake.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
get_system_config,
validate_cli_args,
log_event_received,
get_topic_partitions,
set_producer_consumer,
get_custom_partitioner,
)


Expand All @@ -58,7 +56,7 @@
CONSUME_TOPICS = [
SYS_CONFIG["kafka-topics"]["pizza_assembled"],
]
_, PRODUCER, CONSUMER, ADMIN_CLIENT = set_producer_consumer(
_, PRODUCER, CONSUMER, _ = set_producer_consumer(
kafka_config_file,
producer_extra_config={
"on_delivery": delivery_report,
Expand All @@ -69,9 +67,6 @@
"client.id": f"""{SYS_CONFIG["kafka-client-id"]["microservice_baked"]}_{HOSTNAME}""",
},
)
CUSTOM_PARTITIONER = get_custom_partitioner()
PARTITIONS_BAKED = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_BAKED)
PARTITIONS_STATUS = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_STATUS)

# Set signal handler
GRACEFUL_SHUTDOWN = GracefulShutdown(consumer=CONSUMER)
Expand All @@ -91,7 +86,6 @@ def pizza_baked(order_id: str):
"timestamp": timestamp_now(),
}
).encode(),
partition=CUSTOM_PARTITIONER(order_id.encode(), PARTITIONS_BAKED),
)
PRODUCER.flush()

Expand Down
10 changes: 1 addition & 9 deletions msvc_delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
get_system_config,
validate_cli_args,
log_event_received,
get_topic_partitions,
set_producer_consumer,
get_custom_partitioner,
import_state_store_class,
)

Expand All @@ -61,7 +59,7 @@
TOPIC_PIZZA_ORDERED = SYS_CONFIG["kafka-topics"]["pizza_ordered"]
TOPIC_PIZZA_BAKED = SYS_CONFIG["kafka-topics"]["pizza_baked"]
CONSUME_TOPICS = [TOPIC_PIZZA_ORDERED, TOPIC_PIZZA_BAKED]
_, PRODUCER, CONSUMER, ADMIN_CLIENT = set_producer_consumer(
_, PRODUCER, CONSUMER, _ = set_producer_consumer(
kafka_config_file,
producer_extra_config={
"on_delivery": delivery_report,
Expand All @@ -72,10 +70,6 @@
"client.id": f"""{SYS_CONFIG["kafka-client-id"]["microservice_delivery"]}_{HOSTNAME}""",
},
)
CUSTOM_PARTITIONER = get_custom_partitioner()
PARTITIONS_DELIVERED = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_DELIVERED)
PARTITIONS_PENDING = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_PENDING)
PARTITIONS_STATUS = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_STATUS)

# Set signal handler
GRACEFUL_SHUTDOWN = GracefulShutdown(consumer=CONSUMER)
Expand Down Expand Up @@ -110,7 +104,6 @@ def pizza_delivered(order_id: str):
"timestamp": timestamp_now(),
}
).encode(),
partition=CUSTOM_PARTITIONER(order_id.encode(), PARTITIONS_DELIVERED),
)
PRODUCER.flush()

Expand All @@ -125,7 +118,6 @@ def pizza_pending(order_id: str):
"timestamp": timestamp_now(),
}
).encode(),
partition=CUSTOM_PARTITIONER(order_id.encode(), PARTITIONS_PENDING),
)
PRODUCER.flush()

Expand Down
13 changes: 5 additions & 8 deletions utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
from logging.handlers import TimedRotatingFileHandler
from confluent_kafka.admin import AdminClient

from utils.murmur2 import Murmur2Partitioner


####################
# Global variables #
Expand Down Expand Up @@ -260,8 +258,12 @@ def set_producer_consumer(

# Set producer config
if not disable_producer:
producer_common_config = {
"partitioner": "murmur2_random",
}
producer = Producer(
{
**producer_common_config,
**config_kafka,
**producer_extra_config,
}
Expand All @@ -278,8 +280,8 @@ def set_producer_consumer(
}
consumer = Consumer(
{
**config_kafka,
**consumer_common_config,
**config_kafka,
**consumer_extra_config,
}
)
Expand Down Expand Up @@ -310,11 +312,6 @@ def get_topic_partitions(
return partitions


def get_custom_partitioner():
p = Murmur2Partitioner()
return p.partition


def delivery_report(err, msg):
"""Reports the failure or success of an event delivery"""
msg_key = "" if msg.key() is None else msg.key().decode()
Expand Down
7 changes: 1 addition & 6 deletions webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
validate_cli_args,
get_system_config,
set_producer_consumer,
get_topic_partitions,
get_custom_partitioner,
import_state_store_class,
)

Expand All @@ -68,16 +66,14 @@

# Set producer object
PRODUCE_TOPIC_ORDERED = SYS_CONFIG["kafka-topics"]["pizza_ordered"]
_, PRODUCER, _, ADMIN_CLIENT = set_producer_consumer(
_, PRODUCER, _, _ = set_producer_consumer(
kafka_config_file,
producer_extra_config={
"on_delivery": delivery_report,
"client.id": f"""{SYS_CONFIG["kafka-client-id"]["webapp"]}_{HOSTNAME}""",
},
disable_consumer=True,
)
CUSTOM_PARTITIONER = get_custom_partitioner()
PARTITIONS_ORDERED = get_topic_partitions(ADMIN_CLIENT, PRODUCE_TOPIC_ORDERED)

# Set signal handler
GRACEFUL_SHUTDOWN = GracefulShutdown()
Expand Down Expand Up @@ -231,7 +227,6 @@ def order_pizza():
PRODUCE_TOPIC_ORDERED,
key=order_id,
value=json.dumps(order_details).encode(),
partition=CUSTOM_PARTITIONER(order_id.encode(), PARTITIONS_ORDERED),
)
PRODUCER.flush()

Expand Down

0 comments on commit e4bd009

Please sign in to comment.