Skip to content

More about the Azure Service Bus transport

Mogens Heller Grabe edited this page Dec 5, 2013 · 5 revisions

The Azure Service Bus transport will automatically ensure that a topic exists called Rebus. All Rebus activity will happen inside this single topic, so you're free to use the same Azure Service Bus namespace for other, non-Rebus related things.

The transport will create a subscription for each "input queue", which will function as a logical queue.

Each subscription will filter messages by the logical input queue name (which is the same as the name of the subscription, and which is passed as a custom property on the BrokeredMessage), allowing it to receive only messages meant for that particular client.

The reason only one single topic is used, is because Azure Service Bus only supports transactional bus operations that span one single topic - and by restricting all action to the Rebus topic, Rebus can guarantee that messages are received and sent atomically, satisfying the same transactional guarantees as the MSMQ and RabbitMQ transports.

Long-running message handlers

Since Azure Service Bus' transactional receive works by taking a "peek lock" on the incoming message, it is important that all message handlers are capable of finishing their work well within the peek lock timeout.

Rebus sets the peek lock timeout to 5 minutes, which is currently the maximum supported timeout supported by Azure Service Bus.

If you need longer time to finish your work, you can renew the peek lock by invoking a special renewal method (an Action) that is passed into the Items collection on the message context.

The method can be found under the key specified by the const string AzureServiceBusMessageQueue.AzureServiceBusRenewLeaseAction, so a typical long-running handler whose definition looks like this:

public class GoSlowHandler : IHandleMessages<GoSlow>
{
    readonly IMessageContext _messageContext;

    public GoSlowHandler(IMessageContext messageContext)
    {
        _messageContext = messageContext;
    }

    // ...
}



public void Handler(GoSlow message)
{
    using(var peekLockRenewalTimer = new System.Timers.Timer(60 * 1000))
    {
        peekLockRenewalTimer.Elapsed += delegate
        {
            var items = _messageContext.Items;
            var key = AzureServiceBusMessageQueue.AzureServiceBusRenewLeaseAction;
            var renewPeekLock = (Action)items[key];

            renewPeekLock();
        };
        peekLockRenewalTimer.Start();

        HandleMessageForAsLongAsItTakes(message);
    }
}
Clone this wiki locally