Skip to content

Commit

Permalink
Merge pull request #29 from superstreamlabs/2.4.0-beta
Browse files Browse the repository at this point in the history
release
  • Loading branch information
idanasulin2706 authored Sep 27, 2024
2 parents 93c6525 + 889c8dd commit 2701848
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ To leverage the full capabilities of the Superstream SDK, it is essential to set
|-------------------------------------|------------------|-----------|-------------------------------------------------------------------------------------------------------|
| `SUPERSTREAM_HOST` | - | Yes | Specify the host URL of the Superstream service to connect to the appropriate Superstream environment. |
| `SUPERSTREAM_TOKEN` | - | No | This authentication token is required when the engine is configured to work with local authentication, to securely access the Superstream services. |
| `SUPERSTREAM_TAGS` | Empty string | No | Set this variable to tag the client. This is a string - comma-separated list of tags. |
| `SUPERSTREAM_TAGS` | Empty string | No | Set this variable to tag the client. This value of this variable should be a valid JSON string. |
| `SUPERSTREAM_DEBUG` | False | No | Set this variable to true to enable Superstream logs. By default, there will not be any Superstream related logs. |
| `SUPERSTREAM_RESPONSE_TIMEOUT` | 3000 | No | Set this variable to specify the timeout in milliseconds for the Superstream service response. |

Expand Down
4 changes: 2 additions & 2 deletions patch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ options:
The `src`, `prefix`, and `output` are required arguments. The `version` argument is optional. An example command is shown below:

```sh
python3 patch.py --src "/input/path/to/wheel/created/using/pdm" --output "/output/path/to/patched/pkgs" --prefix "superstream-confluent-kafka-beta-2.4.0.1"
python3 patch.py --src "/input/path/to/wheel/created/using/pdm" --output "/output/path/to/patched/pkgs" --prefix "superstream-confluent-kafka-beta-2.4.0.2"
```

**The value of `--prefix` should be the same as the name of the package that will be patched followed by the version number.** For example, if the package name is `superstream-confluent-kafka` and the version is `2.4.0.1`, the value of `--prefix` should be `superstream-confluent-kafka-beta-2.4.0.1`.
**The value of `--prefix` should be the same as the name of the package that will be patched followed by the version number.** For example, if the package name is `superstream-confluent-kafka` and the version is `2.4.0.2`, the value of `--prefix` should be `superstream-confluent-kafka-beta-2.4.0.2`.
2 changes: 1 addition & 1 deletion src/confluent_kafka/superstream/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class SdkInfo:
VERSION = "2.4.0.1"
VERSION = "2.4.0.2"
LANGUAGE = "python"


Expand Down
33 changes: 30 additions & 3 deletions src/confluent_kafka/superstream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def __init__(
self.optimized_config_received = False
self._initial_topic_partition_update_sent = False

self._config_update_cb = None

async def _request(
self,
subject: str,
Expand Down Expand Up @@ -687,6 +689,9 @@ async def is_started():
if subscription is not None:
await subscription.unsubscribe()

def set_config_update_cb(self, cb: Callable):
self._config_update_cb = cb

def process_update(self, update: Update):
LEARNED_SCHEMA = "LearnedSchema"
TOGGLE_REDUCTION = "ToggleReduction"
Expand All @@ -710,9 +715,31 @@ def toggle_reduction_handler(payload: bytes):
self.reduction_enabled = reduction_update.enable_reduction

def compression_update_handler(payload: bytes):
compression_update = CompressionUpdate.model_validate_json(payload)
if compression_update.compression_type:
self.compression_type = compression_update.compression_type
try:
compression_update = CompressionUpdate.model_validate_json(payload)
if compression_update.compression_type:
if not KafkaUtil.is_valid_compression_type(
compression_update.compression_type
):
raise Exception(
f"invalid compression type: {compression_update.compression_type}"
)

self.compression_type = compression_update.compression_type
new_config = KafkaUtil.extract_producer_config(self.configs)
compression_config = KafkaUtil.get_compression_config(
self.compression_type, new_config
)
new_config.update(compression_config)
if self._config_update_cb:
self.std.write(
f"superstream: updating producer config with compression type: {self.compression_type}"
)
self._config_update_cb(new_config)
except Exception as e:
self.std.error(
f"superstream: error processing compression update: {e!s}"
)

handlers = {
LEARNED_SCHEMA: learned_schema_handler,
Expand Down
5 changes: 4 additions & 1 deletion src/confluent_kafka/superstream/producer_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@

class SuperstreamProducerInterceptor:
def __init__(self, config: Dict, producer_handler: Callable | None = None):
self._compression_type = "zstd"
self._superstream_config_ = Superstream.init_superstream_props(
config, SuperstreamClientType.PRODUCER
)
self._producer_handler = producer_handler

def set_config_update_cb(self, config_update_cb: Callable):
if self.superstream:
self.superstream.set_config_update_cb(config_update_cb)

def set_producer_handler(self, producer_handler: Callable):
self._producer_handler = producer_handler

Expand Down
75 changes: 71 additions & 4 deletions src/confluent_kafka/superstream/superstream_producer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,84 @@
import asyncio
import queue
import threading
from typing import Dict

from confluent_kafka.cimpl import Producer as _ProducerImpl

from .producer_interceptor import SuperstreamProducerInterceptor


class SuperstreamProducer(_ProducerImpl):
class SuperstreamProducer:
def __init__(self, config: Dict):
self._lock = threading.Lock()
self._message_queue = queue.Queue()
self._interceptor = SuperstreamProducerInterceptor(config)
config = self._interceptor.wait_for_superstream_configs_sync(config)
self._interceptor.set_full_configuration(config)
super().__init__(config)
self._interceptor.set_producer_handler(super().produce)
self._p = _ProducerImpl(config)
self._interceptor.set_producer_handler(self._p.produce)
self._interceptor.set_config_update_cb(self._update_config)
self._config = config
self._loop = asyncio.get_event_loop()

def __len__(self):
return len(self._p)

def produce(self, *args, **kwargs):
self._interceptor.produce(*args, **kwargs)
if self._lock.locked():
self._message_queue.put((args, kwargs))
else:
self._interceptor.produce(*args, **kwargs)

async def _produce_messages_from_queue(self):
while not self._message_queue.empty():
args, kwargs = self._message_queue.get()
self._interceptor.produce(*args, **kwargs)

def _update_config(self, new_config: Dict):
with self._lock:
try:
self.poll(0)
self.flush()

self._p = _ProducerImpl(new_config)

self._interceptor.set_full_configuration(new_config)
self._interceptor.set_producer_handler(self._p.produce)
asyncio.run_coroutine_threadsafe(
self._produce_messages_from_queue(), self._loop
)

except Exception as e:
print(e)
pass

def poll(self, *args, **kwargs):
return self._p.poll(*args, **kwargs)

def flush(self, *args, **kwargs):
return self._p.flush(*args, **kwargs)

def purge(self, *args, **kwargs):
return self._p.purge(*args, **kwargs)

def list_topics(self, *args, **kwargs):
return self._p.list_topics(*args, **kwargs)

def init_transactions(self, *args, **kwargs):
return self._p.init_transactions(*args, **kwargs)

def begin_transaction(self, *args, **kwargs):
return self._p.begin_transaction(*args, **kwargs)

def send_offsets_to_transaction(self, *args, **kwargs):
return self._p.send_offsets_to_transaction(*args, **kwargs)

def commit_transaction(self, *args, **kwargs):
return self._p.commit_transaction(*args, **kwargs)

def abort_transaction(self, *args, **kwargs):
return self._p.abort_transaction(*args, **kwargs)

def set_sasl_credentials(self, *args, **kwargs):
return self._p.set_sasl_credentials(*args, **kwargs)
32 changes: 28 additions & 4 deletions src/confluent_kafka/superstream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ class KafkaUtil:
"compression.type",
"compression.level",
"queue.buffering.backpressure.threshold",
"compression.codec",
"compression.type",
"batch.num.messages",
"batch.size",
"delivery.report.only.error",
Expand Down Expand Up @@ -483,6 +481,28 @@ class KafkaUtil:
"consume.callback.max.messages": 0,
}

@staticmethod
def is_valid_compression_type(compression_type):
return compression_type in ["none", "gzip", "snappy", "lz4", "zstd"]

@staticmethod
def extract_producer_config(config: Dict[str, Any]) -> Dict[str, Any]:
producer_config = {}
producer_keys = (
KafkaUtil.ProducerConfigKeys + KafkaUtil.ProducerAndConsumerConfigKeys
)
for key in config:
if key in producer_keys:
producer_config[key] = config[key]
return producer_config

@staticmethod
def get_compression_config(compression_type, full_config: dict = None) -> dict:
compression_config = {
"compression.type": compression_type,
}
return compression_config

@staticmethod
def is_valid_producer_key(key):
return (
Expand All @@ -508,7 +528,9 @@ def extract_kafka_config(config: Dict[str, Any]) -> Dict[str, Any]:
@staticmethod
def enrich_producer_config(config: Dict[str, Any]) -> Dict[str, Any]:
enriched_config = config.copy() if config else {}
producer_keys = KafkaUtil.ProducerConfigKeys + KafkaUtil.ProducerAndConsumerConfigKeys
producer_keys = (
KafkaUtil.ProducerConfigKeys + KafkaUtil.ProducerAndConsumerConfigKeys
)
for key in producer_keys:
if key in enriched_config:
continue
Expand All @@ -520,7 +542,9 @@ def enrich_producer_config(config: Dict[str, Any]) -> Dict[str, Any]:
@staticmethod
def enrich_consumer_config(config: Dict[str, Any]) -> Dict[str, Any]:
enriched_config = config.copy() if config else {}
consumer_keys = KafkaUtil.ConsumerConfigKeys + KafkaUtil.ProducerAndConsumerConfigKeys
consumer_keys = (
KafkaUtil.ConsumerConfigKeys + KafkaUtil.ProducerAndConsumerConfigKeys
)
for key in consumer_keys:
if key in enriched_config:
continue
Expand Down
2 changes: 1 addition & 1 deletion version-beta.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.18
2.4.19
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.0.1
2.4.0.2

0 comments on commit 2701848

Please sign in to comment.