Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added message nack and dead-letter functionality #250

Merged
merged 2 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -899,12 +899,13 @@ msgs = await memphis.fetch_messages(
```

### Fetch a single batch of messages after creating a consumer

```python
msgs = await consumer.fetch(batch_size=10) # defaults to 10
```

### Fetch a single batch of messages after creating a consumer
`prefetch = true` will prefetch next batch of messages and save it in memory for future fetch() request<br>

```python
msgs = await consumer.fetch(batch_size=10, prefetch=True) # defaults to False
```
Expand All @@ -917,6 +918,23 @@ Acknowledge a message indicates the Memphis server to not re-send the same messa
await message.ack()
```

### Nacking a Message

Mark the message as not acknowledged - the broker will resend the message immediately to the same consumers group, instead of waiting to the max ack time configured.

```python
await message.nack();
```

### Sending a message to the dead-letter

Sending the message to the dead-letter station (DLS) - the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
The message will still be available to other consumer groups

```python
await message.dead_letter("reason");
```

### Delay the message after a given duration

Delay the message and tell Memphis server to re-send the same message again to the same consumer group. The message will be redelivered only in case `consumer.max_msg_deliveries` is not reached yet.
Expand All @@ -925,7 +943,7 @@ Delay the message and tell Memphis server to re-send the same message again to t
await message.delay(delay_in_seconds)
```

### Get headers
### Get headers

Get headers per message

Expand Down
4 changes: 2 additions & 2 deletions memphis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async def __consume(self, callback, partition_key: str = None, consumer_partitio

for msg in msgs:
memphis_messages.append(
Message(msg, self.connection, self.consumer_group, self.internal_station_name)
Message(msg, self.connection, self.consumer_group, self.internal_station_name, partition=partition_number)
)
await callback(memphis_messages, None, self.context)
await asyncio.sleep(self.pull_interval_ms / 1000)
Expand Down Expand Up @@ -255,7 +255,7 @@ async def main(host, username, password, station):
msgs = await self.subscriptions[partition_number].fetch(batch_size)
for msg in msgs:
messages.append(
Message(msg, self.connection, self.consumer_group, self.internal_station_name))
Message(msg,self.connection,self.consumer_group,self.internal_station_name,partition=partition_number))
if prefetch:
number_of_messages_to_prefetch = batch_size * 2
self.load_messages_to_cache(number_of_messages_to_prefetch, partition_number)
Expand Down
35 changes: 34 additions & 1 deletion memphis/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from memphis.station import Station

class Message:
def __init__(self, message, connection, cg_name, internal_station_name):
def __init__(self, message, connection, cg_name, internal_station_name, partition = 0):
self.message = message
self.connection = connection
self.cg_name = cg_name
self.internal_station_name = internal_station_name
self.partition = partition
self.station = Station(connection, internal_station_name)

async def ack(self):
Expand All @@ -37,6 +38,38 @@ async def ack(self):
raise MemphisConnectError(str(e)) from e
return

async def nack(self):
"""
nack - not ack for a message, meaning that the message will be redelivered again to the same consumers group without waiting to its ack wait time.
"""
if not hasattr(self.message, 'nak'):
return
await self.message.nak()
tbazen marked this conversation as resolved.
Show resolved Hide resolved

async def dead_letter(self, reason: str):
"""
dead_letter - Sending the message to the dead-letter station (DLS). the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
The message will still be available to other consumer groups
"""
try:
if not hasattr(self.message, 'term'):
return
await self.message.term()
tbazen marked this conversation as resolved.
Show resolved Hide resolved
md = self.message.metadata()
stream_seq = md.sequence.stream
request = {
"station_name": self.internal_station_name,
"error": reason,
"partition": self.partition,
"cg_name": self.cg_name,
"seq": stream_seq,
}
await self.connection.broker_manager.publish(
"$memphis_nacked_dls", json.dumps(request).encode("utf-8")
)
except Exception as e:
raise MemphisConnectError(str(e)) from e

def get_data(self):
"""Receive the message."""
try:
Expand Down