Best Practices for Leveraging Azure Service Bus Brokered Messaging

Best Practices for Leveraging Azure Service Bus Brokered Messaging

This article offers practical guidance for developers working with the brokered messaging .NET managed API in the Azure Service Bus. The recommendations supplied in this article come directly from recent customer projects. While building real-world solutions with the Service Bus, we learned about some key best practices and little-known secrets that will help increase reliability and improve the performance of the solutions leveraging the new brokered messaging capabilities in the Service Bus. This article intends to share these learnings with the developer community.

Relayed vs Brokered Messaging

The Windows Azure Service Bus provides two comprehensive messaging solutions. The first solution is available through a centralized, highly load-balanced  “relay” service running in the cloud that supports a variety of different transport protocols and Web services standards, including SOAP, WS-*, and REST. The relay service supports direct one-way messaging, request/response messaging, and peer-to-peer messaging. The pattern associated with this type of messaging solution is referred to as “relayed“ messaging. In the relayed messaging pattern, an on-premises or cloud-based service connects to the relay service through an outbound port and creates a bi-directional socket for communication tied to a particular rendezvous address. The client doesn’t need to know where the service resides, and the on-premises service does not need any inbound ports open on the firewall.  Relayed messaging provides many benefits, but requires the server and client to both be online at the same time in order to send and receive messages. Relayed messaging has been available since the initial release of the Service Bus.

The second messaging solution, introduced in the latest version of the Service Bus, enables “brokered” messaging capabilities. The brokered messaging scheme can also be thought of as asynchronous or “temporally decoupled” messaging. Producers (senders) and consumers (receivers) do not have to be online at the same time. The messaging infrastructure reliably stores messages until the consuming party is ready to receive them. This allows the components of the distributed application to be disconnected, either voluntarily; for example, for maintenance, or due to a component crash, without affecting the whole system. Furthermore, the receiving application may only have to come online during certain times of the day, such as an inventory management system that only is required to run at the end a business day.

The core components of the Service Bus brokered messaging infrastructure are queues, topics, and subscriptions. These components enable new asynchronous messaging scenarios, such as temporal decoupling, publish/subscribe, load leveling, and load balancing. For more information about these scenarios, see the Additional Resources section.

Brokered Messaging API Overview

Throughout this guidance, you will see many references to various components, classes and types available in the brokered messaging .NET managed API. To put things into context, let's start off by highlighting some of the key API artifacts that deliver and support the brokered messaging capability in the Service Bus.

The following classes are the most frequently-used API members from the Microsoft.ServiceBus and Microsoft.ServiceBus.Messaging namespaces, often involved when developing a brokered messaging solution:

Class Name Description
BrokeredMessage Represents the unit of communication between Service Bus clients. The serialized instances of the BrokeredMessage objects are transmitted through a wire when messaging clients communicate via queues and topics.
QueueClient Represents a messaging object that enables sending and receiving messages from a Service Bus queue.
QueueDescription Represents a metadata object describing a Service Bus queue including queue path, behavioral settings (such as lock duration, default TTL, duplicate detection) and informational data points (such as current queue length and size).
TopicClient Represents a messaging object that enables sending messages to a Service Bus topic.
TopicDescription Represents a metadata object describing a Service Bus topic including topic path, behavioral settings (such as duplicate detection) and informational data points (such as current size and maximum topic size).
SubscriptionClient Represents a messaging object that enables receiving messages from a Service Bus subscription.
SubscriptionDescription Represents a metadata object describing a Service Bus subscription including subscription name, owing topic path, behavioral settings (such as session support, default TTL, lock duration) and informational data points (such as current message count).
NamespaceManager Represents a management object responsible for runtime operations with Service Bus messaging entities (queues, topics, subscriptions, rules) including creating, retrieving, deleting and asserting the existence.
MessagingFactory Represents a factory object responsible for instantiating, tracking and managing the lifecycle of the messaging entity clients such as TopicClient, QueueClient and SubscriptionClient.
MessageReceiver Represents an abstract messaging object that supports rich messaging functionality with a particular focus on the message receive operations.
MessageSender Represents an abstract messaging object that supports rich messaging functionality with a particular focus on the message send operations.
MessageSession Represents a message session that allows grouping of related messages for processing in a single transaction.
Filter Represents an abstract metadata object that is comprised of a filter expression and associated action that gets executed in the Service Bus subscription evaluation engine. The Filter class serves the purpose of a base class for TrueFilter, FalseFilter, SqlFilter and CorrelationFilter which represent the implementations of a metadata object for a given filter type.
TokenProvider Represents a factory object that provides access to the different types of security token providers responsible for the acquisition of SAML, Shared Secret and Simple Web tokens.

It is recommended that you familiarize yourself with the above API artifacts to get a head start on building your first brokered messaging solution with the Service Bus. Please note that the above is not an exhaustive list of all classes found in the brokered messaging API. For a complete landscape of all API members, please refer to the MSDN documentation.

Best Practices in Brokered Messaging API

The topics in this section are intended to share specific recommendations that were derived from hands-on experience with the .NET managed brokered messaging API. The goal of these recommendations is to encourage developers to apply the techniques and patterns discussed below, in order to be able to deliver robust messaging solutions.

Managing the Messaging Object Lifecycle

Messaging objects such as TopicClient, QueueClient and SubscriptionClient are intended to be created once and reused whenever possible. These objects are compliant with thread safety enabling you to send or receive messages in parallel from multiple threads. There is some small overhead associated with authenticating a client request in the Access Control Service (ACS) when creating a messaging object, therefore it is recommended that you cache TopicClient, QueueClient and SubscriptionClient object instances. For optimal resource utilization, consider limiting the cache scope to the lifetime of the messaging component that uses the respective Service Bus messaging objects.

The lifetime of a messaging object begins when a new instance is retrieved from the MessagingFactory object:

// The following actions are often performed upon initialization of an application-level messaging component.

string issuerName = "Issuer Name is retrieved from configuration file";
string issuerSecret = "Issuer Secret also comes from configuration file";
string serviceNamespace = "contoso-cloud";
string queuePath = "PurchaseOrders";

var credentials = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerSecret);
var address = ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, String.Empty);
var messagingFactory = MessagingFactory.Create(address, credentials);
var queueClient = messagingFactory.CreateQueueClient(queuePath, ReceiveMode.ReceiveAndDelete);

As mentioned earlier, a single QueueClient object can be reused for sending or receiving messages to or from a given queue. There is no need to create a new instance of the QueueClient object for every message that is being sent or received:

for (int i = 0; i < 5; i++)
{
    using (var msg = new BrokeredMessage(String.Format("Message #{0}", i)))
    {
        queueClient.Send(msg);
    }
}

It is important to note that the messaging objects maintain an active connection back to the Service Bus messaging infrastructure hosted on the Windows Azure platform. As with many other types of multi-tenant services, the brokered messaging services provided by the Service Bus are subject to quotas with respect to how many active concurrent connections a single messaging entity (such as a queue, topic or subscription) can support. To minimize the number of concurrent connections, it is advisable to explicitly control the lifetime of the Service Bus messaging objects and close them if you don’t plan to re-use them at a later stage. You should also close the messaging factory object upon a graceful termination of your messaging solution.

Double Quote  Note
Closing a messaging object does not close the underlying connection since multiple messaging objects share the connection at a factory level. In turn, closing a messaging factory object will close the underlying connection to the Service Bus messaging infrastructure.

To close a given messaging object, you must invoke its Close() method using one of the following techniques:

// When a queue client is no longer required, let's close it so that it doesn't consume a connection.

// Option #1: Closing a specific messaging object instance.
queueClient.Close();

// Option #2: Closing all messaging objects tracked by the messaging factory.
messagingFactory.Close();

It is also worth noting that in some rare cases, the messaging objects may end up in a state that prevents them from being closed gracefully. Should such a situation occur, the brokered messaging API will ensure that appropriate actions will be taken, including aborting a connection if it cannot be closed successfully. You do not need to perform a status check to decide whether to call the Abort() or Close() methods. This is performed internally by the API. Please note that the Close() method is not guaranteed to complete without throwing an exception. Therefore, if you want to ensure that closing a messaging object is always safe, an additional layer of protection in the form of a try/catch construct is recommended.

Double Quote  Note
Although closing a messaging object is not equal to a dispose action, a closed object cannot be re-opened if you later decide to reuse the closed instance. If you attempt to invoke an operation against a closed messaging object, you may receive a self-explanatory exception such as “This messaging entity has already been closed, aborted, or disposed”. There is no public Open() method that can be called from the client to restore a messaging object to an opened state. You must create a new instance of the messaging object. This recommendation also applies to the MessagingFactory objects.

The lifetime of a messaging object ends upon calling the Close() method. The easiest way to ensure that all messaging objects used by a solution are gracefully terminated is by explicitly closing the MessagingFactory object used to create messaging clients for queues, topics and subscriptions. An explicit close on MessagingFactory implicitly closes all messaging objects created and owned by the class. For example, you may want to close the factory object inside the Dispose() method of your messaging component, inside the OnStop() method provided by RoleEntryPoint or from within the UnhandledException event handler.

Dealing with Faulted Messaging Objects

It is widely known among many WCF developers that a WCF communication object is subject to a special precaution in order to handle internal state transition; in particular, those situations in which the WCF object ends up in a faulted state. Often, the WCF communication stack is required to be reset, for instance, by recreating a client channel, in order to recover from this condition.

The brokered messaging API provides “out-of-the-box” resilience against the faulted communication objects by handling and recovering from conditions that can make the underlying communication objects unusable. Unlike traditional WCF clients, Service Bus messaging clients that leverage the brokered messaging API don’t need to implement any special logic in order to deal with faulted communication objects. All communication objects such as MessageFactory, QueueClient, TopicClient, SubscriptionClient, MessageSender, and MessageReceiver will automatically detect and recover from exceptions that could potentially bring the communication stack into a non-operational state.

Certain messaging operations such as Complete, Abandon and Defer will not be able to provide a seamless automatic recovery. If Complete() or Abandon() fail with the MessagingCommunicationException exception, the only recourse is to receive another message, possibly the same one that failed upon completion, provided a competing consumer didn’t retrieve it in the meantime.

Handling Transient Communication Errors

To improve the reliability of a solution that uses the Service Bus brokered messaging managed API, it is recommended that you adopt a consistent approach to handling transient faults and intermittent errors that could manifest themselves when the solution communicates to the highly multi-tenant cloud-based queuing and publish/subscribe messaging service infrastructure provided by the Service Bus.

When considering a specific technique for detecting transient conditions, you may want to reuse existing technical solutions such as the Transient Fault Handling Framework or build your own. In both cases, you will need to ensure that only a subset of communication exceptions is treated as transient before attempting to recover from the respective faults.

The table below provides a list of exceptions that can be compensated by implementing retry logic:

Exception Type Recommendation
ServerBusyException This exception can be caused by an intermittent fault in the Service Bus messaging service infrastructure that is not able to process a request due to point-in-time abnormal load conditions. The client can attempt to retry with a delay. A back-off delay would be preferable to prevent adding unnecessary pressure to the server.
MessagingCommunicationException This exception signals a communication error that can manifest itself when a connection from the messaging client to the Service Bus infrastructure cannot be successfully established. In most cases, provided network connectivity exists, this error can be treated as transient. The client can attempt to retry the operation that has resulted in this type of exception. It is also recommended that you verify whether the domain name resolution service (DNS) is operational as this error may indicate that the target host name cannot be resolved.
TimeoutException This exception indicates that the Service Bus messaging service infrastructure did not respond to the requested operation within the specified time which is controlled by the OperationTimeout setting. The requested operation may have still been completed; however, due to network or other infrastructure delays, the response may not have reached the client in a timely fashion. Compensating this type of exceptions must be done with caution. If a message has been delivered to a queue but a response has timed out, resending the original message will result in duplication.

For more detailed information about different types of exceptions that can be reported by the Service Bus messaging API, see the “Messaging Exceptions” topic in the MSDN documentation.

Double Quote  Note
When handling transient communication errors, beware of transient exceptions masked by outer exceptions of a different type. For example, a timeout may return to the caller in the form of a communication error that hides the original timeout as an inner exception. It is therefore recommended that you inspect all inner exceptions of a given exception object in a recursive fashion to be able to reliably detect transient communication errors. The ServiceBusTransientErrorDetectionStrategy class in the Transient Fault Handling Framework provides an example of how this can be accomplished.

The following code snippet demonstrates how to asynchronously send a message to a Service Bus topic while ensuring that all known transient faults will be compensated by a retry. Please note that this code sample maintains a dependency on the Transient Fault Handling Framework.

var credentials = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerSecret);
var address = ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, String.Empty);
var messagingFactory = MessagingFactory.Create(address, credentials);
var topicClient = messagingFactory.CreateTopicClient(topicPath);
var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

// Create an instance of the object that represents message payload.
var payload = XDocument.Load("InventoryFile.xml");

// Declare a BrokeredMessage instance outside so that it can be reused across all 3 delegates below.
BrokeredMessage msg = null;

// Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
retryPolicy.ExecuteAction
(
    (cb) =>
    {
        // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
        // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
        msg = new BrokeredMessage(payload.Root, new DataContractSerializer(typeof(XElement)));

        // Send the event asynchronously.
        topicClient.BeginSend(msg, cb, null);
    },
    (ar) =>
    {
        try
        {
            // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
            topicClient.EndSend(ar);
        }
        finally
        {
            // Ensure that any resources allocated by a BrokeredMessage instance are released.
            if (msg != null)
            {
                msg.Dispose();
                msg = null;
            }
        }
    },
    (ex) =>
    {
        // Always dispose the BrokeredMessage instance even if the send operation has completed unsuccessfully.
        if (msg != null)
        {
            msg.Dispose();
            msg = null;
        }

        // Always log exceptions.
        Trace.TraceError(ex.Message);
    }
);

The next code sample shows how to reliably create a new or retrieve an existing Service Bus topic. This code also maintains a dependency on the Transient Fault Handling Framework which will automatically retry the corresponding management operation if it fails to be completed successfully due to intermittent connectivity issues or other types of transient conditions:

public TopicDescription GetOrCreateTopic(string issuerName, string issuerSecret, string serviceNamespace, string topicName)
{
    // Must validate all input parameters here. Use Code Contracts or build your own validation.
    var credentials = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerSecret);
    var address = ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, String.Empty);
    var nsManager = new NamespaceManager(address, credentials);
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);
            
    TopicDescription topic = null;
    bool createNew = false;

    try
    {
        // First, let's see if a topic with the specified name already exists.
        topic = retryPolicy.ExecuteAction<TopicDescription>(() => { return nsManager.GetTopic(topicName); });

        createNew = (topic == null);
    }
    catch (MessagingEntityNotFoundException)
    {
        // Looks like the topic does not exist. We should create a new one.
        createNew = true;
    }

    // If a topic with the specified name doesn't exist, it will be auto-created.
    if (createNew)
    {
        try
        {
            var newTopic = new TopicDescription(topicName);

            topic = retryPolicy.ExecuteAction<TopicDescription>(() => { return nsManager.CreateTopic(newTopic); });
        }
        catch (MessagingEntityAlreadyExistsException)
        {
            // A topic under the same name was already created by someone else, perhaps by another instance. Let's just use it.
            topic = retryPolicy.ExecuteAction<TopicDescription>(() => { return nsManager.GetTopic(topicName); });
        }
    }

    return topic;
}

In summary, it is advisable to assess the likelihood of a failure occurring, and determine the feasibility of adding additional resilience. Virtually all messaging operations can be subject to transient conditions. When calling into the brokered messaging API, it is therefore recommended that you take appropriate actions to always provide recovery from intermittent issues.

Sending Messages Asynchronously

In order to take advantage of advanced performance features in the Service Bus such as client-side batching, you should always consider using the asynchronous programming model when implementing a messaging solution using the brokered messaging managed API. The asynchronous messaging pattern will enable you to build solutions that can generally avoid the overhead of I/O-bound operations such as sending and receiving messages.

When you invoke an API method asynchronously, control returns immediately to your code and your application continues to execute while the asynchronous operation is being executed independently. Your application either monitors the asynchronous operation or receives notification by way of a callback when the operation is complete. At this time, your application can obtain and process the results.

It is important to note that when you invoke synchronous operations, for example the Send() or Receive() methods in the QueueClient class (or other synchronous methods provided by Service Bus brokered messaging API), internally the API code flows through the asynchronous versions of the respective methods, albeit in a blocking fashion. However, using the synchronous versions of these methods may not render the full spectrum of performance-related benefits that you can expect when calling the asynchronous versions directly. This is particularly apparent when you are sending or receiving multiple messages and want to perform other processing while the respective messaging operations are being executed.

Double Quote  Note
A BrokeredMessage object represents a message, and is provided for the purpose of transmitting data across the wire. As soon as a BrokeredMessage object is sent to a queue or topic, it is consumed by the underlying messaging stack and cannot be reused for further operations. This is due to the fact that once the message body is read, the stream that projects the message data cannot be rewound. You should retain the source data used to construct a BrokeredMessage instance until you can reliably assert the success of the messaging operation. If a failed messaging operation requires a retry, you should construct a new BrokeredMessage instance using that source data.

The following code snippet demonstrates how to send multiple messages asynchronously (as well as reliably) while maintaining the order in which the messages are being sent:

// This sample assumes that a queue client is declared and initialized earlier.

// Declare the list of messages that will be sent.
List<XElement> messages = new List<XElement>();

// Populate the list of messages.
for (int i = 0; i < msgCount; i++)
{
    messages.Add(XDocument.Load(new StringReader(String.Format(@"<root><msg num=""{0}""/></root>", i))).Root);
}

// Declare a list in which sent messages will be tracked.
var sentMessages = new List<XElement>();

// Declare a wait object that will be used for synchronization.
var waitObject = new ManualResetEvent(false);

// Declare a timeout value during which the messages are expected to be sent.
var sentTimeout = TimeSpan.FromMinutes(10);

// Declare and initialize an action that will be calling the asynchronous messaging operation.
Action<XElement> sendAction = null;
sendAction = ((payload) =>
{
    // Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
            // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
            BrokeredMessage msg = new BrokeredMessage(payload, new DataContractSerializer(typeof(XElement)));

            // Send the message asynchronously.
            queueClient.BeginSend(msg, cb, Tuple.Create<XElement, BrokeredMessage>(payload, msg));
        },
        (ar) =>
        {
            // Obtain the state object containing the brokered message being sent.
            var state = ar.AsyncState as Tuple<XElement, BrokeredMessage>;

            try
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
                queueClient.EndSend(ar);

                // Track sent messages so that we can determine what was actually sent.
                sentMessages.Add(state.Item1);

                // Get the next message to be sent.
                var nextMessage = sentMessages.Count < messages.Count ? messages[sentMessages.Count] : null;

                // Make sure we actually have another message to be sent.
                if (nextMessage != null)
                {
                    // If so, call the Send action again to send the next message.
                    sendAction(nextMessage);
                }
                else
                {
                    // Otherwise, signal the end of the messaging operation.
                    waitObject.Set();
                }
            }
            finally
            {
                // Ensure that any resources allocated by a BrokeredMessage instance are released.
                if (state != null & state.Item2 != null)
                {
                    state.Item2.Dispose();
                }
            }
        },
        (ex) =>
        {
            // Always log exceptions.
            Trace.TraceError(ex.Message);
        }
    );
});

// Start with sending the first message.
sendAction(messages[0]);

// Perform other processing while the messages are being sent.
// ...

// Wait until the messaging operations are completed.
bool completed = waitObject.WaitOne(sentTimeout);
waitObject.Dispose();

if (completed && sentMessages.Count == messages.Count)
{
    // Handle successful completion.
}
else
{
    // Handle timeout condition (or a failure to send all messages).
}

Whenever possible, avoid parallelizing the messaging operations using the default scheduling and work partitioning algorithms provided by the Task Parallel Library (TPL) and Parallel LINQ (PLINQ). The basics of the TPL Framework are best suited for adding parallelism and concurrency to applications mostly from a compute-bound operation perspective. The “as is” use of TPL to improve the performance of I/O-bound code such as networking calls and messaging operations may not produce the improvements you would expect. The best way to leverage the TPL to support asynchronous operations is through the use of advanced TPL patterns that conform to the asynchronous programming model.

Receiving Messages Asynchronously

Similar to sending messages asynchronously, and also from a practical point of view, you can also extend the use of the asynchronous programming model to receiving messages from the Service Bus.

While waiting for new messages either on a Service Bus queue or subscription, your solution will often be issuing a polling request. Fortunately, the Service Bus offers a long-polling receive operation which maintains a connection to the server until a message arrives on a queue or the specified timeout period has elapsed, whichever occurs first. If a long-polling receive is performed synchronously, it will block the CLR thread pool thread while waiting for a new message, which is not considered optimal. The capacity of the CLR thread pool is generally limited; hence there is good reason to avoid using the thread pool for particularly long-running operations.

To build a truly effective messaging solution using the Service Bus brokered messaging API, you should always perform the receive operation asynchronously. Whether your solution receives one message at a time or fetches multiple messages, you begin the receive operation using the BeginReceive() method with the specified timeout. In the current API, the maximum receive timeout value is 24 days. While the Service Bus messaging client is waiting on your behalf for a new message, your solution can proceed with performing any other work. Upon completion, your callback method will be notified and the message that was received (if any) will be available for processing.

Double Quote  Note
Once a message is received from a queue or subscription, its body can only be read once. Due to the nature of network protocols, message data streams are not always “rewindable”, because they do not often support a seek operation. You should secure the message data by placing it into an object after calling the GetBody() method, then keep that object for as long as you need it. Attempting to invoke the GetBody() method more than once is not supported by the brokered messaging API.

The code sample below shows an example of a programming method that asynchronously receives the specified number of messages from a Service Bus queue:

public static IEnumerable<T> Get<T>(QueueClient queueClient, int count, TimeSpan waitTimeout)
{
    // Use a wait semaphore object to report on completion of the async receive operations.
    var waitObject = new ManualResetEvent(false);

    // Use a retry policy to improve reliability of messaging operations.
    var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(RetryPolicy.DefaultClientRetryCount);

    // Create an internal queue of messages we received from the Service Bus queue.
    var queueMessages = new ConcurrentQueue<T>();

    try
    {
        for (int i = 0; i < count; i++)
        {
            // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
            retryPolicy.ExecuteAction
            (
                (cb) =>
                {
                    // Start receiving a new message asynchronously.
                    queueClient.BeginReceive(waitTimeout, cb, null);
                },
                (ar) =>
                {
                    // Complete the asynchronous operation. This may throw an exception that will be handled internally by retry policy.
                    BrokeredMessage msg = queueClient.EndReceive(ar);

                    // Check if we actually received any messages.
                    if (msg != null)
                    {
                        try
                        {
                            // Retrieve the message body. We can only consume the body once. Once consumed, it's no longer retrievable.
                            T msgBody = msg.GetBody<T>();

                            // Add the message body to the internal list.
                            queueMessages.Enqueue(msgBody);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.Complete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.Abandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }

                        // Count the number of messages received so far and signal a completion.
                        if (queueMessages.Count == count)
                        {
                            // Otherwise, signal the end of the messaging operation.
                            waitObject.Set();
                        }
                    }
                },
                (ex) =>
                {
                    // Always log exceptions.
                    Trace.TraceError(ex.Message);
                }
            );
        }

        // Wait until all async receive operations are completed.
        waitObject.WaitOne(waitTimeout);
    }
    catch (Exception ex)
    {
        // We intend to never fail when fetching messages from a queue. We will still need to report an exception.
        Trace.TraceError(ex.Message);
    }
    finally
    {
        if (waitObject != null)
        {
            waitObject.Dispose();
        }
    }

    return queueMessages;
}

In line with the recommendation supplied in the previous section, it is best to use the asynchronous programming model integration provided by Task Parallel Library for parallelizing the asynchronous message receive operation.

Implementing Reliable Message Receive Loops

Through observations from several customer projects leveraging the brokered messaging API, we noticed that receiving a message is often subject to a canonical repeated implementation of the receive logic without a sound approach to handling potential anomalies. Generally, such logic doesn’t allow for edge cases; for example, expired message locks. This type of approach can be error-prone if it is not implemented in a robust fashion. The purpose of this section is to provide some specific recommendations around the implementation of reliable message receive logic.

First, it is important to note the two distinct modes in which messages can be received from the Service Bus. These modes are provided by the brokered messaging API to support message delivery using either “At Most Once” (with ReceiveMode.ReceiveAndDelete) or “At Least Once” (with ReceiveMode.PeekLock) semantics.

The first mode is ReceiveAndDelete, which is the simplest model and works best for scenarios in which the application can tolerate a failure in message processing. When using the ReceiveAndDelete mode, the receive action is a single-hop operation during which a message delivered to the client is marked as being consumed and subsequently removed from the respective queue or subscription.

The second mode is PeekLock, which prescribes that a received message is to remain hidden from other consumers until its lock timeout expires. With the PeekLock mode, the receive process becomes a two-stage operation making it possible to support applications that cannot tolerate failed messages. In addition to issuing a request to receive a new message (first stage), the consuming application is required to indicate when it has finished processing the message (second stage). After the application finishes processing the message, or stores (defers) it reliably for future processing, it completes the second stage of the receive process by calling the Complete() method on the received message.

When you specify PeekLock mode, you should always finalize the successful processing of a message by calling the Complete() method, which tells the Service Bus to mark the message processing as completed. Failure to call the Complete() method on a message received in PeekLock mode will result in the message re-appearing in a queue or subscription after its lock timeout expires. Consequently, you will receive the previously processed message again, and this may result in a duplicate message being processed.

In addition, in relation to PeekLock mode, you should tell the Service Bus if a message cannot be successfully processed and therefore must be returned for subsequent redelivery. Whenever possible, your messaging solution should handle this situation by calling the Abandon() method, instead of waiting until a lock acquired for the message expires. Ideally, you will call the Abandon() method from within a catch block that belongs to the try/catch exception handling construct serving the messaging handling context.

It is important to ensure that message processing happens strictly within the designated lock period. In the brokered messaging functionality introduced with the current release of the Service Bus, the maximum message lock duration is 5 minutes, and this duration cannot currently be extended at runtime. If a message takes longer to process than the lock duration set on a queue or subscription, its visibility lock will time out and the message will again become available to the consumers of the queue or subscription. If you attempt to complete or abandon such a message, you may receive a MessageLockLostException error that indicates there is no valid lock found for the given message.

In order to implement a robust message receive loop, it is recommended that you build resilience against all known transient errors as well as any abnormalities that can manifest themselves during or after message processing. This is especially important when receiving messages using PeekLock mode. Because there is always a second stage involved in PeekLock mode, you should never assume that a message successfully processed on the client can be reliably marked as completed in the Service Bus backend. For example, a fault in the underlying network layer may prevent you from completing message processing successfully. Such an implication requires that you handle idempotency edge cases, as you may receive the same message more than once. This behavior is in line with many other messaging solutions that operate in the “At Least Once” message delivery mode.

You can add additional resilience when calling the Complete() and Abandon() methods by using extension methods. For example:

public static bool SafeComplete(this BrokeredMessage msg)
{
    try
    {
        // Mark brokered message as complete.
        msg.Complete();

        // Return a result indicating that the message has been completed successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, or the underlying topic/subscription may have been removed.
        // If Complete() fails with this exception, the only recourse is to prepare to receive another message (possibly the same one).
    }
            
    return false;
}

public static bool SafeAbandon(this BrokeredMessage msg)
{
    try
    {
        // Abandons a brokered message. This will cause the Service Bus to unlock the message and make it available to be received again, 
        // either by the same consumer or by another competing consumer.
        msg.Abandon();

        // Return a result indicating that the message has been abandoned successfully.
        return true;
    }
    catch (MessageLockLostException)
    {
        // It's too late to compensate the loss of a message lock. We should just ignore it so that it does not break the receive loop.
        // We should be prepared to receive the same message again.
    }
    catch (MessagingException)
    {
        // There is nothing we can do as the connection may have been lost, or the underlying topic/subscription may have been removed.
        // If Abandon() fails with this exception, the only recourse is to receive another message (possibly the same one).
    }

    return false;
}

A similar approach can be extended to shield other messaging operations such as Defer from potential failures. The pattern in which the above extension methods can be used is reflected in the code snippet below. This code fragment demonstrates how to implement a receive loop while taking advantage of the additional resilience provided by the extension methods:

var waitTimeout = TimeSpan.FromSeconds(10);

// Declare an action acting as a callback whenever a message arrives on a queue.
AsyncCallback completeReceive = null;

// Declare an action acting as a callback whenever a non-transient exception occurs while receiving or processing messages.
Action<Exception> recoverReceive = null;

// Declare a cancellation token that is used to signal an exit from the receive loop.
var cts = new CancellationTokenSource();

// Declare an action implementing the main processing logic for received messages.
Action<BrokeredMessage> processMessage = ((msg) =>
{
    // Put your custom processing logic here. DO NOT swallow any exceptions.
});

// Declare an action responsible for the core operations in the message receive loop.
Action receiveMessage = (() =>
{
    // Use a retry policy to execute the Receive action in an asynchronous and reliable fashion.
    retryPolicy.ExecuteAction
    (
        (cb) =>
        {
            // Start receiving a new message asynchronously.
            queueClient.BeginReceive(waitTimeout, cb, null);
        },
        (ar) =>
        {
            // Make sure we are not told to stop receiving while we were waiting for a new message.
            if (!cts.IsCancellationRequested)
            {
                // Complete the asynchronous operation. This may throw an exception that will be handled internally by retry policy.
                BrokeredMessage msg = queueClient.EndReceive(ar);

                // Check if we actually received any messages.
                if (msg != null)
                {
                    // Make sure we are not told to stop receiving while we were waiting for a new message.
                    if (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            // Process the received message.
                            processMessage(msg);

                            // With PeekLock mode, we should mark the processed message as completed.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Mark brokered message as completed at which point it's removed from the queue.
                                msg.SafeComplete();
                            }
                        }
                        catch
                        {
                            // With PeekLock mode, we should mark the failed message as abandoned.
                            if (queueClient.Mode == ReceiveMode.PeekLock)
                            {
                                // Abandons a brokered message. This will cause Service Bus to unlock the message and make it available 
                                // to be received again, either by the same consumer or by another completing consumer.
                                msg.SafeAbandon();
                            }

                            // Re-throw the exception so that we can report it in the fault handler.
                            throw;
                        }
                        finally
                        {
                            // Ensure that any resources allocated by a BrokeredMessage instance are released.
                            msg.Dispose();
                        }
                    }
                    else
                    {
                        // If we were told to stop processing, the current message needs to be unlocked and return back to the queue.
                        if (queueClient.Mode == ReceiveMode.PeekLock)
                        {
                            msg.SafeAbandon();
                        }
                    }
                }
            }

            // Invoke a custom callback method to indicate that we have completed an iteration in the message receive loop.
            completeReceive(ar);
        },
        (ex) =>
        {
            // Invoke a custom action to indicate that we have encountered an exception and
            // need further decision as to whether to continue receiving messages.
            recoverReceive(ex);
        });
});

// Initialize a custom action acting as a callback whenever a message arrives on a queue.
completeReceive = ((ar) =>
{
    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop.
        receiveMessage();
    }
});

// Initialize a custom action acting as a callback whenever a non-transient exception occurs while receiving or processing messages.
recoverReceive = ((ex) =>
{
    // Just log an exception. Do not allow an unhandled exception to terminate the message receive loop abnormally.
    Trace.TraceError(ex.Message);

    if (!cts.IsCancellationRequested)
    {
        // Continue receiving and processing new messages until we are told to stop regardless of any exceptions.
        receiveMessage();
    }
});

// Start receiving messages asynchronously.
receiveMessage();

// Perform any other work. Message will keep arriving asynchronously while we are busy doing something else.

// Stop the message receive loop gracefully.
cts.Cancel();

The above example implements an advanced approach to receiving messages asynchronously in the order in which they appear on a queue. It ensures that any errors encountered during processing will result in cancelling the message and returning it back into the queue so that it can be re-processed. The extra code is justified by supporting graceful cancellation of the message receive loop.

Conclusion

The cloud-based messaging and communication infrastructure provided by the latest release of the Service Bus supports reliable message queuing and durable publish/subscribe messaging capabilities. Because such “brokered” messaging services provided by the Service Bus may be subject to quotas with respect to active concurrent connections maintained by messaging entities (such as a queue, topic or subscription), this article detailed some best practices in managing the specifics involved in the lifecycle of such entities and messaging objects, and provided guidance on building your applications with an awareness of resource efficacy.

Of equal importance, when building solutions that have dependencies on such cloud-based technologies, it’s important to build an element of reliability and resilience into your code, and this article has imparted guidance and some real-world examples on how to do so. Such robust practices help cloud-based applications deal with anomalies that may be out of your control, such as transient network communication errors that can manifest themselves in multi-tenant, cloud-based environments.

Finally, this article has provided best practices to help you design code that is robust and efficient, while leveraging some of the advanced features of the Service Bus, such as sending and receiving messages asynchronously, and implementing reliable message loops as part of that process.

Check back here often as we continue to post more best practices guidance on our blog. As always, please send us your comments and feedback.

Additional Resources/References

For more information on the topic discussed in this blog post, please refer to the following resources:

Did this blog post help you? Please give us your feedback. Tell us on a scale of 1 (poor) to 5 (excellent), how would you rate this post and why have you given it this rating? For example:

  • Are you rating it high due to quality code samples, self-explanatory visuals, clear writing, or another reason?
  • Are you rating it low due to poor examples, fuzzy screen shots, or unclear writing?

Your feedback will help us improve the quality of guidance we release. Thank you!

Authored by: Valery Mizonov
Contributed by: Seth Manheim, Paolo Salvatori, James Podgorski, Eric Lam, Jayu Katti

Sort by: Published Date | Most Recent | Most Useful
Comments
  • How does this related to on-premise variants MSMQ and BizTalk?

Page 1 of 1 (1 items)