Sunday, April 10, 2022

Robust Messaging using Azure Service Bus

What is Service Bus

  • Fully managed enterprise Message Broker

  • Decouple applications and services from each other

  • Message Queues and Publish/Subscribe Topics

  • Load-balancing across Competing workers

  • High degree of Reliability and Throughput

 

What This Document will do?

  • Differentiate between Messages and Events

  • Look at the Orchestrators or Service Bus who can Produce and Consume Messages

  • Deep dive into Service Bus Concepts and Advanced Features

  • Detailed look at some of code examples for Producers and Consumers

  • Code examples discussed are available at Source Code

 

What is a Message?

about-service-bus-queue.png

 

about-service-bus-topic.png

 

  • Raw Data Produced by a service to be Consumed or Stored elsewhere

  • Contains the data that triggered the Message pipeline

  • Publisher of the Message has an expectation about how the Consumer handles the Message

  • Contract exists between the two sides

    • Who is sending the message?

    • What is the message?

    • Where is the message being sent?

 

Service Bus - Deep Dive

  • Intended for traditional enterprise applications which require Transactions, Ordering, Duplicate detection, and Instantaneous Consistency

  • Enables cloud-native applications to provide reliable state transition management for business processes

  • Facilitates Highly Secure and Reliable communication across hybrid cloud solutions and can connect existing on-premises systems to cloud solutions

  • Brokered Messaging system. It stores messages in a broker (for example, a queue) until the consuming party is ready to receive the messages

  • Available either as Data Streams or Event Batches

  • Reliable Asynchronous message delivery (enterprise messaging as a service) that requires polling

  • Advanced messaging features like FIFO, Sessions, Transaction Scopes, Dead-lettering and Filtering and Duplicate Detection

  • At least once delivery

  • Optional in-order delivery

 

Queues

about-service-bus-queue.png

  • Offer First In, First Out (FIFO) message delivery to one or more competing consumers

  • Receivers typically Receive and Process messages in the order in which they were added to the queue, and only one message consumer receives and processes each message

  • Producers (senders) and Consumers (receivers) do not have to be sending and receiving messages at the same time, because messages are stored durably in the queue

  • Producer does not have to wait for a reply from the Consumer in order to continue to process and send messages

  • Related benefit is Load Levelling which enables producers and consumers to send and receive messages at different rates

  • Brokered Transfer

brokered-messaging.png

  • Maximize Availability - delays arising in services won't have an immediate and direct impact on the application

  • Producer can continue to post messages to the queue even when the service isn't available or isn't currently processing messages

  • Consumer can process at its own pace, without being overloaded

  • Maximize Scalability because both the number of queues and the number of services can be varied to meet demand

Competing Consumer

competing-consumers.png

  • Multiple Receivers compete for messages on the same Queue

  • Provides Automatic Load Balancing of work to receivers volunteering for Jobs

  • Improves Reliability

    • Messages aren't sent to a specific service instance

    • A failed service instance won't block a Producer

    • Messages can be processed by any working service instance.

  • Scalable - The system can dynamically increase or decrease the number of instances of the consumer service as the volume of messages fluctuates.

  • Improve Resiliency

    • Consumer service instance reads and processes the Message as part of a transactional operation

    • If the Consumer service instance fails, this pattern can ensure that the message will be returned to the queue to be picked up and handled by another instance of the consumer service

Topics and Subscriptions

about-service-bus-topic.png

  • Provides a one-to-many form of communication, in a Publish/Subscribe pattern

  • Useful for scaling to large numbers of Recipients, each published message is made available to each subscription registered with the Topic

  • Messages are sent to a Topic and delivered to one or more associated subscriptions

 

Pre-Fetching

prefetching.png

  • Prefetch feature can be enabled for a Service Bus client

  • The Receiver acquires Additional no. of Messages than intended by the application initially

  • With Pre-fetching enabled, Service Bus behaves differently in the following two scenarios

    • Receive-and-Delete mode

      • Messages acquired into the Prefetch Buffer are no longer available in the queue

      • Messages exist in the in-memory Prefetch Buffer until received into the application

      • Messages are Irrecoverable or Lost, If the client application ends before the Messages are Received into the application

    • Peek-Lock mode

      • Messages fetched into the Prefetch Buffer are in a Locked state

      • In case the Prefetch Buffer is large and processing takes long than message lock period - the subscribing client has to take appropriate measures as per the requirement

      • If the Lock expires Silently in the Prefetch Buffer, the Message is treated as Abandoned and is again made available for retrieval from the Queue

      • If the requirement is to achieve High Reliability for message processing and processing takes significant work and time, the recommendation is to use the Prefetch feature conservatively, or not at all

      • If the requirement is to achieve High Throughput and message processing is generally quicker, faster, the recommendation is to go for Prefetch option and get significant throughput benefits

 

Service Bus - Reliability

Throttling

  • Throttling from an external system on which Service Bus depends

  • Throttling occurs from interactions with storage and compute resources

  • Enabling Partitioning on a Queue or Topic to reduce the probability of throttling

Issue for an Azure dependency

  • Issue for a system on which Service Bus depends. For example, a given part of storage can encounter issues

  • To work around these types of issues, Service Bus regularly investigates and implements mitigations

  • Due to the nature of the mitigation, a sent message can take up to 15 minutes to appear in the affected queue or subscription and be ready for a receive operation

Service Bus failure on a single subsystem

  • Failure of Service Bus on single subsystem. In this situation, a compute node can get into an inconsistent state and must restart itself, causing all entities it serves to load balance to other nodes. This in turn can cause a short period of slow message processing

  • The client application generates a System.TimeoutException or MessagingException exception. Service Bus contains a mitigation for this issue in the form of automated client retry logic. Once the retry period is exhausted and the message is not delivered

 

Peek Lock – At least once

peek-lock.png

  • Sender sends the messages.

  • Receiver locks the message from other receivers.

  • The next Receiver locks the next message.

  • Complete removes the message from the queue.

  • Abandon gives up lock and makes it available for the next receiver

 

Receive and Delete – At most once

receive-n-delete.png

  • Sender sends the messages

  • Receiver receives the message and deletes from the queue

  • The next receiver receives and Deletes the next message

  • Abandon and Complete operations are not needed

 

Message Sessions

sb-sessions.png

  • Sessions allow pinning sets of related sets of related messages to a receiver even when using competing consumers

  • Session state may be used to store intermediate state for a session

  • A Session Receiver is created by a client accepting a session

  • When the session is accepted and held by a client, the client holds an exclusive lock on all messages with that session's session ID in the queue or subscription. It will also hold exclusive locks on all messages with the session ID that will arrive later

  • The Lock is released when you call close methods on the receiver or when the lock Expires. There are methods on the receiver to Renew the locks as well

  • When multiple Concurrent Receivers pull from the Queue, the messages belonging to a particular Session are dispatched to the specific Receiver that currently holds the lock for that session

  • The Session Lock held by the session receiver is an umbrella for the message locks used by the peek-lock settlement mode

  • Only one Receiver can have a lock on a session

  • A Receiver may have many in-flight messages, but the messages will be received in order

  • Abandoning a message causes the same message to be served again with the next receive operation

sessions-2.png

 

 

Duplicate Detection

  • Helps keep track of the application-controlled MessageId of all messages sent into a queue or topic during a specified time window

  • If any new message is sent with MessageId that was logged during the time window, the message is reported as accepted (the send operation succeeds)

  • Newly sent message is instantly Ignored and Dropped. No other parts of the message other than the MessageId are considered

  • Application control of the Identifier is essential, because only that allows the application to tie the MessageId to a business process context from which it can be predictably reconstructed when a failure occurs

  • Duplicate detection history time window can be configured during which message-ids are retained. This value defaults to 10 minutes for queues and topics, with a minimum value of 20 seconds to maximum value of 7 days

  • Enabling duplicate detection and the size of the window directly impact the queue (and topic) throughput, since all recorded message-ids must be matched against the newly submitted message identifier

  • Keeping the window small means that fewer message-ids must be retained and matched and hence less impact on Throughput

  • For high throughput entities that require Duplicate Detection, ideally keep the window as small as possible

 

 

Schedule Messages

  • You can submit messages to a queue or topic for delayed processing; for example, to schedule a job to become available for processing by a system at a certain time. This capability realizes a reliable distributed time-based scheduler.

  • Scheduled messages do not materialize in the queue until the defined enqueue time. Before that time, scheduled messages can be canceled. Cancellation deletes the message

  • The SequenceNumber for a scheduled message is only valid while the message is in this state. As the message transitions to the active state, the message is appended to the queue as if had been enqueued at the current instant, which includes assigning a new SequenceNumber

  • Because the feature is anchored on individual messages and messages can only be enqueued once, Service Bus does not support recurring schedules for messages

 

 

Dead Letter Messages

dead-letter.png

  • A Secondary sub-queue, called a Dead-Letter queue (DLQ)

  • The dead-letter queue doesn't need to be explicitly created

  • Can't be deleted or managed independent of the main entity

  • Holds Messages that can't be delivered to any receiver, or Messages that couldn't be processed

  • Messages can then be removed from the DLQ and inspected. An application might rectify issues and resubmit the message

  • Several Activities in Service Bus can cause messages to get pushed to the DLQ

  • Receiving Application can also explicitly move messages to the DLQ

 

 

Deferred Messages

defer-messages.png

  • Deferral is a feature created specifically for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order

  • They may have to postpone processing of some received messages until prescribed prior work that's informed by other messages has been completed

  • Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed

  • If a message can't be processed because a particular resource for handling that message is temporarily unavailable but message processing shouldn't be summarily suspended, a way to put that message on the side for a few minutes is to remember the sequence number in a scheduled message to be posted in a few minutes, and re-retrieve the deferred message when the scheduled message arrives

  • If a message handler depends on a database for all operations and that database is temporarily unavailable, it shouldn't use deferral, but rather suspend receiving messages altogether until the database is available again

 

Retrieving deferred messages

  • Deferred messages remain in the main queue along with all other active messages (unlike dead-letter messages that live in a subqueue), but they can no longer be received using the regular receive operations. Deferred messages can be discovered via message browsing if an application loses track of them

  • To retrieve a deferred message, its owner is responsible for remembering the sequence number as it defers it

  • Any receiver that knows the sequence number of a deferred message can later receive the message by using receive methods that take the sequence number as a parameter. For more information about sequence numbers, see Message sequencing and timestamps

 

Auto Forwarding

auto-forwarding.png

  • If the destination entity accumulates too many messages and exceeds the quota, or the destination entity is disabled, the source entity adds the messages to its dead-letter queue until there is space in the destination (or the entity is re-enabled)

  • Messages continue to live in the dead-letter queue, so you must explicitly receive and process them from the dead-letter queue

  • A first-level topic with 20 subscriptions, each of them chained to a second-level topic with 200 subscriptions, allows for higher throughput than a first-level topic with 200 subscriptions, each chained to a second-level topic with 20 subscriptions

  • To create a subscription that is chained to another queue or topic, the creator of the subscription must have Manage permissions on both the source and the destination entity. Sending messages to the source topic only requires Send permissions on the source topic

  • Messages that exceed 4 hops are dead-lettered

sb-fwd-1.gifsb-fwd-2.gif

 

 

Transaction Processing

transaction-processing.png

  • Groups two or more operations together into an execution scope

  • Ensure that all Operations belonging to a given group of operations either Succeed or Fail jointly

  • Transactions act as one unit, which is often referred to as atomicity

  • Send several messages to one Queue/Topic from within a transaction scope

    • Messages will only be committed when the Transaction successfully completes.

  • Operations available within a transaction scope -

    • Send

    • Complete

    • Abandon

    • Dead letter

    • Defer

    • Renew lock

  • Message from Sender reaches Transfer Queue or Topic

    • Transfer Queue or Topic immediately moves the message to the intended Destination Queue or Topic

    • Message is never Committed to the Transfer Queue or Topic and hence is not visible to the Consumers

  • Order of Transactions within the Transaction Scope is important; e,.g.

    • Receive - Read from the Transfer Queue or Topic

    • Send1 - Send to Queue1 or Topic1

    • Send2 - Send to Queue2 or Topic2

 

 

Code Examples

How to Send Messages

SendQueueMessageAsync

 

private async Task<ResponseModel> SendQueueMessageAsync
                                          (string queueNameString, HeaderModel headerModel,
                                           List<MessageModel> messagesList)
{

    kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString);
    var serviceBusSender = kServiceBusClient.CreateSender(queueNameString);            
    var serviceBusMessagesList = PrepareAllQueueMessages(messagesList);
    ResponseModel responseModel = null;

    try
    {

      await serviceBusSender.SendMessagesAsync(serviceBusMessagesList);
      responseModel = new ResponseModel()
      {

        Code = 200,
        Message = $"message batch sent:{serviceBusMessagesList.Count}"

      };
    }
    catch(ServiceBusException ex)
    {

      responseModel = new ResponseModel()
      {

        Code = 400,
        Message = ex.Message

      };
    }
    finally
    {
      await serviceBusSender.DisposeAsync();
    }

    return responseModel;

}

 

 

 

ScheduleQueueMessageAsync

 

private async Task<List<ResponseModel>> ScheduleQueueMessageAsync
                                                (string queueNameString,
                                                 HeaderModel headerModel,
                                                 List<MessageModel> messagesList,
                                                 Dictionary<string, int> queryStringMap)
{

    kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString);
    var serviceBusSender = kServiceBusClient.CreateSender(queueNameString);            
    var serviceBusMessagesList = PrepareAllQueueMessages(messagesList);                
    int delayMinutes = (int)(queryStringMap["delayBy"])/60;
    long scheduleSequence = 0;            
    var responseModelsList = new List<ResponseModel>();

    try
    {

      var scheduledTasksList = serviceBusMessagesList.Select
        (async (ServiceBusMessage serviceBusMessage) =>
         {

           scheduleSequence = await serviceBusSender.ScheduleMessageAsync
             (serviceBusMessage,
              DateTimeOffset.Now.AddMinutes(delayMinutes));
           var responseModel = new ResponseModel()
           {

             Code = 200,
             Message = $"message scheduled:{scheduleSequence}"

           };
           responseModelsList.Add(responseModel);

         }).ToList();

      await Task.WhenAll(scheduledTasksList);

    }
    catch (ServiceBusException ex)
    {

      var responseModel = new ResponseModel()
      {

        Code = 400,
        Message = ex.Message

      };
      responseModelsList.Add(responseModel);

    }
    finally
    {
      await serviceBusSender.DisposeAsync();
    }

    return responseModelsList;

}

 

 

 

How to Receive Messages

ReadFromDeadLetterQueue

 

public async Task<IActionResult> ReadFromDeadLetterQueue
                                         (string queueNameString,
                                         [FromHeader] HeaderModel headerModel)
{

    kServiceBusClient = new ServiceBusClient(headerModel.ConnectionString);
    var deadLetterReceiver = kServiceBusClient.CreateReceiver(queueNameString,
		new ServiceBusReceiverOptions()
		{

				SubQueue = SubQueue.DeadLetter,
        ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete

    });                                                              

    MessageModel receivedModel = null;
    ErrorModel errorModel = null;

    try
    {

      var receivedMessage = await deadLetterReceiver.ReceiveMessageAsync(kWaitTimeSpan);
      if (receivedMessage == null)
        throw new ArgumentNullException(nameof(receivedMessage));

      receivedModel = JsonConvert.DeserializeObject<MessageModel>
        (Encoding.UTF8.GetString(receivedMessage.Body));
      if (receivedModel == null)
        throw new ArgumentNullException(nameof(receivedModel));

    }
    catch (ArgumentNullException ex)            
    {

      errorModel = new ErrorModel()
      {

        Code = 500,
        Message = ex.Message

      };

    }
    catch (ServiceBusException ex)
    {

      errorModel = new ErrorModel()
      {

        Code = 500,
        Message = ex.Message

      };
    }
    finally
    {

      await deadLetterReceiver.DisposeAsync();

    }

    return Ok((receivedModel != null) ? receivedModel : errorModel);

}

 

 

 

Service Bus Listener

 

// Service Bus SDK client
private ServiceBusClient _serviceBusClient;

// Message processor for Service Bus
private ServiceBusProcessorOptions _serviceBusProcessorOptions;
private ServiceBusProcessor _serviceBusProcessor;

// Session based Message processor for Service Bus
private ServiceBusSessionProcessorOptions _serviceBusSessionProcessorOptions;
private ServiceBusSessionProcessor _serviceBusSessionProcessor;

....
public MessageProcessor(string connectionString)
{

  _connectionString = connectionString;
  _serviceBusClient = new ServiceBusClient(connectionString);

}

.....

 

 

 

public async Task StartProcessingAsync(MessageProcessorCallback messageProcessorCallback)
{

  _serviceBusProcessor.ProcessMessageAsync += (ProcessMessageEventArgs processMessageEventArgs) =>            
  {

    messageProcessorCallback.Invoke(processMessageEventArgs, null);
    return Task.CompletedTask;

  };

  _serviceBusProcessor.ProcessErrorAsync += (ProcessErrorEventArgs processErrorEventArgs) =>
  {                

    messageProcessorCallback.Invoke(null, processErrorEventArgs);
    return Task.CompletedTask;

  };

  await _serviceBusProcessor.StartProcessingAsync();

}

 

 

 

 

Start Message Processing for a Session

 

public async Task StartSessionProcessingAsync (MessageSessionProcessorCallback
                                               messageSessionProcessorCallback)
{

    _serviceBusSessionProcessor.ProcessMessageAsync +=
    (ProcessSessionMessageEventArgs processSessionMessageEventArgs) =>
    {

      messageSessionProcessorCallback.Invoke(processSessionMessageEventArgs, null);
      return Task.CompletedTask;

    };

    _serviceBusSessionProcessor.ProcessErrorAsync +=
    (ProcessErrorEventArgs processErrorEventArgs) =>
    {               

      messageSessionProcessorCallback.Invoke(null, processErrorEventArgs);
      return Task.CompletedTask;

    };

    await _serviceBusSessionProcessor.StartProcessingAsync();

}

 

 

 

How to perform Transaction processing

ForwardTopicAsync

 

public async Task<IActionResult> ForwardToTopicAsync
                                         (string topicNameString, string subscriptionNameString,
                                         [FromHeader] ForwardHeaderModel forwardHeaderModel,
                                         [FromQuery] Dictionary<string, string> queryStringMap)
{

    var serviceBusClientOptions = new ServiceBusClientOptions()
    {

      EnableCrossEntityTransactions = true,                
      TransportType = ServiceBusTransportType.AmqpTcp

    };

    kServiceBusClient = new ServiceBusClient(forwardHeaderModel.ConnectionString,
                                             serviceBusClientOptions);
    var serviceBusReceiverOptions = new ServiceBusReceiverOptions()
    {

      PrefetchCount = 2,
      ReceiveMode = ServiceBusReceiveMode.PeekLock              

    };

    ServiceBusReceiver serviceBusReceiver = null;
    ServiceBusSender nextHopSender = null;
    OCRModel receivedModel = null;
    ErrorModel errorModel = null;

    try
    {

      var sessionNameString = queryStringMap["session"];                
      var nextHopTopicNameString = forwardHeaderModel.NextHopTopicName;                
      var nextHopSessionNameString = forwardHeaderModel.NextHopSessionName;

      serviceBusReceiver = kServiceBusClient.CreateReceiver
        (topicNameString, subscriptionNameString,
         serviceBusReceiverOptions);
      nextHopSender = kServiceBusClient.CreateSender(nextHopTopicNameString);

      var receivedMessage = await serviceBusReceiver?.ReceiveMessageAsync(kWaitTimeSpan);                                            
      if (receivedMessage == null)
        throw new ArgumentNullException(nameof(receivedMessage));

      receivedModel = JsonConvert.DeserializeObject<OCRModel>
        (Encoding.UTF8.GetString(receivedMessage.Body));
      if (receivedModel == null)
        throw new ArgumentNullException(nameof(receivedModel));

      using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
      {

        await serviceBusReceiver.CompleteMessageAsync(receivedMessage);

        var serviceBusMessage = new ServiceBusMessage(receivedMessage);
        serviceBusMessage.TransactionPartitionKey = receivedMessage.PartitionKey;
        serviceBusMessage.SessionId = nextHopSessionNameString;

        await nextHopSender.SendMessageAsync(serviceBusMessage);
        ts.Complete();

      }
    }
    catch (ArgumentNullException ex)
    {

      errorModel = new ErrorModel()
      {

        Code = 400,
        Message = ex.Message

      };

    }
    catch (ServiceBusException ex)
    {

      errorModel = new ErrorModel()
      {

        Code = 500,
        Message = ex.Message

      };

    }
    finally
    {

      if (serviceBusReceiver != null)
        await serviceBusReceiver.DisposeAsync();

      if (nextHopSender != null)
        await nextHopSender.DisposeAsync();

    }

    return Ok((receivedModel != null) ? receivedModel : errorModel);

}

 

 

 

References

Posted at https://sl.advdat.com/3DWAUe1https://sl.advdat.com/3DWAUe1