diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index 54d1c92..4f62232 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -443,6 +443,10 @@ def on_message(self, channel, method, properties, body, args): """ connection = args[0] task = get_task(body, method.delivery_tag) + # the task needs to be acknowledged in the same channel that it was + # received. So store the channel. This is useful when executing batched + # indexing tasks since they are acknowledged late. + task._channel = channel self.local_queue.put((task, channel, connection)) def process_blocking(self):