Tuesday, June 28, 2011

Windows Azure AppFabric Service Bus Queues API

 

Recently Microsoft released a new App Fabric SDK 2.0 CTP including some great new features,

You can grab all the bits and pieces from here and/or read the release notes.

Some of the highlights include: 

  • Publish/Subscribe which are called Topics
  • Message Queues
  • Visual Studio Tools
  • AppFabric Application Manager
  • Support for running WCF & WF

The part that interested me the most is the Queues feature and that is what I’m going to be exploring in this post.

Overview of Queues

Message Queues are not a new concept allow for more reliable and scalable communication between distributed systems than pure request/response. Solutions like MSMQ, NServiceBus already exist to solve this problem for locally connected systems.  What the Queues API provides is that it provides similar features but the messages are being transported across the internet and persisted in the cloud.  

There is currently a Message Buffer available in Azure but this has serious limitations:

  • Messages only persisted for a maximum 10 minutes
  • Maximum of 50 messages
  • Requires the client to be connected
  • Charged per connection not per message
  • Complex API
  • Max message size of 8 KB

The Queues API addresses a number of these issues by adding.

  • Long term persistence (No clarification on the SLA yet)
  • Simpler API
  • Max message size of 256 KB
  • Sessions
  • Larger size of queue (100 MB currently but set to increase)

To start testing out the Queues feature you need to first Login to the AppFabricLabs portal and create a test Service Namespace.

Then you will need to add Microsoft.ServiceBus and Microsoft.ServiceBus.Messaging dlls to your project. They can be found in C:\Program Files\Windows Azure AppFabric SDK\V2.0\Assemblies\NET4.0

Wrapping It Up

Before starting on the implementation I’m going to define a simple interface around the ServiceBus bits and also add an IMessage interface which all Messages will need to implement.

IServiceBus

    public interface IServiceBus
    {
        void Send(string queueName, IMessage message);
        
        T Receive<T>(string queueName) where T : IMessage; 
    }

 

IMessage

Although not required by the API it’s good practice to have a unique identifier for each message.
When architecting message based systems you need to allow for idempotence meaning that an operation should be able to be executed multiple times without changing the result. Most message queue systems guarantee that the messages will be delivered “at least once” so you need to allow for this in your code and having an unique identifier on the message makes implementing this a trivial exercise.

    public interface IMessage
    {
        Guid Id { get; set; }
    }
Note: All messages must be Serializable

 

Authentication

In order to authenticate against the service bus you need three values:

  • Issuer Name
  • Issuer Key
  • Service Namespace

The service namespace is just the name which you created in the portal, in this example it’s “appfabricdemo1”.

The issuer name & key can be found under the Default Key section in the Portal.

default_key

 

There is a little bit of ceremony in setting up the correct objects but I’ve put this into a single method called InitClient which is called from the Send and Receive methods.

       private readonly string issuerKey;
       private readonly string issuerName;
       private readonly string serviceNamespace;
       private TransportClientCredentialBase clientCredentials;
       private MessagingFactory messagingFactory;
       private ServiceBusNamespaceClient namespaceClient;
       private IList<Queue> queueList; 

       public AzureServiceBus(string issuerName, string issuerKey, string serviceNamespace)
       {
           this.issuerName = issuerName;
           this.issuerKey = issuerKey;
           this.serviceNamespace = serviceNamespace;
       }
       private void InitClient()
       {
           clientCredentials = TransportClientCredentialBase
               .CreateSharedSecretCredential(issuerName, issuerKey);

           var uri = ServiceBusEnvironment
               .CreateServiceUri("https", serviceNamespace, string.Empty);
           
           var runtimeUri = ServiceBusEnvironment
               .CreateServiceUri("sb", serviceNamespace, string.Empty);

           namespaceClient = new ServiceBusNamespaceClient(uri, clientCredentials);

           messagingFactory = MessagingFactory.Create(runtimeUri, clientCredentials);
       }

The key classes here are:

 

Creating the Queue

Before we can start sending and receiving messages you first have to create a queue. The API doesn’t currently have a method to check if a queue exists already so it has to be hand rolled as calling CreateQueue throws an exception if it already exists.

This is fairly easily achieved with two simple methods

       private Queue GetOrCreateQueue(string path)
       {
           path = path.ToLower();

           var queues = GetQueues();

           var queueExists = queues.Any(q => q.Path == path);

           if (queueExists)
           {
               return queues.FirstOrDefault(q => q.Path == path);
           }

           var queue = namespaceClient.CreateQueue(path);

           queues.Add(queue);

           return queue;
       }

       private IList<Queue> GetQueues()
       {
           if (queueList != null)
           {
               return queueList;
           }

           queueList = new List<Queue>();
           var queues = namespaceClient.GetQueues();

           foreach (var queue in queues)
           {
               queueList.Add(queue);
           }

           return queueList;
       }

As the GetQueues method needs to make a remote call I keep an in-memory collection of the Queue objects so that it only calls the first time.

Note: Queue names are converted to lowercase when created.

Sending a Message

In order to send a message you need to create a QueueClient, MessageSender and then convert the message to a BrokeredMessage.

        public void Send(string queueName, IMessage message)
        {
            InitClient();

            var queue = GetOrCreateQueue(queueName);

            var queueClient = messagingFactory.CreateQueueClient(queue);

            using (var messageSender = queueClient.CreateSender())
            {
                var brokeredMessage = ConvertToBrokeredMessage(message);

                messageSender.Send(brokeredMessage);
            }

            queueClient.Close();
            messagingFactory.Close();
        }

        private static BrokeredMessage ConvertToBrokeredMessage(IMessage message)
        {
            var brokeredMessage = BrokeredMessage.CreateMessage(message);

            brokeredMessage.MessageId = message.Id.ToString();

            return brokeredMessage;
        }

 

The BrokeredMessage has a bunch of useful properties, most notable of which are:

  • ContentType
  • CorrelationId
  • DeliveryCount
  • Properties – which is a Dictionary<string, object>
  • MessageId

 

Receiving a Message

Receiving a message is much the same as sending a message in that it requires a QueueClient & MessageReceiver.

There are two different modes when receiving a message.
ReceiveAndDelete which deletes the message immediately after reading or PeekLock which only locks the message and leaves you to manage the delete.

       public T Receive<T>(string queueName) where T : IMessage
       {
           InitClient();

           var queue = GetOrCreateQueue(queueName);

           var queueClient = messagingFactory.CreateQueueClient(queue);

           queueClient.CreateReceiver();

           BrokeredMessage brokeredMessage;

           using (var messageReceiver = queueClient.CreateReceiver(ReceiveMode.ReceiveAndDelete))
           {
               brokeredMessage = messageReceiver.Receive();
           }

           if (brokeredMessage == null)
           {
               return default(T);
           }

           queueClient.Close();
           messagingFactory.Close();

           var message = brokeredMessage.GetBody<T>();

           return message;
       }

 

The Demo App

I wanted to pull this together and so created an example application which is two console apps one being the client and one being the server. The server sends a message to the client on one queue and when the client receives the message it sends a reply message on another queue.

Server

    internal class Program
    {
        private static void Main(string[] args)
        {
            Console.WriteLine("Welcome to the Queue Demo Server");
            Console.WriteLine("Press any key to start:");
            Console.ReadLine();


            var serverQueue = "serverqueue";
            var clientQueue = "clientqueue"; 

            var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], 
                ConfigurationManager.AppSettings["IssuerKey"], 
                ConfigurationManager.AppSettings["ServiceNamespace"]);

            var message = new TestMessage("I've travelled along way just to get here.");

            
            serviceBus.Send(serverQueue, message);

            Console.WriteLine("Sent Message:" + message.Id + " at " + DateTime.Now);


            while (true)
            {
                message = serviceBus.Receive<TestMessage>(clientQueue);

                if (message == null || message.Id == Guid.Empty)
                {
                    Console.WriteLine("No messages found yet I'll keep trying");
                }
                else
                {
                    Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now);
                    Console.WriteLine(message.Message);
                    break;
                }
            }


            Console.ReadLine();
        }
    }
server_console

Client

   internal class Program
   {
       private static void Main(string[] args)
       {
           Console.WriteLine("Welcome to the Queue Demo Client");
           Console.WriteLine("Press any key to start:");
           Console.ReadLine();
           
           var serverQueue = "serverQueue";
           var clientQueue = "clientQueue"; 

           var serviceBus = new AzureServiceBus(ConfigurationManager.AppSettings["IssuerName"], 
               ConfigurationManager.AppSettings["IssuerKey"], 
               ConfigurationManager.AppSettings["ServiceNamespace"]);


           while (true)
           {
               var message = serviceBus.Receive<TestMessage>(serverQueue);

               if (message == null || message.Id == Guid.Empty)
               {
                   Console.WriteLine("No messages found yet I'll keep trying"); 
               }
               else
               {
                   Console.WriteLine("Read Message: " + message.Id + " at " + DateTime.Now);
                   Console.WriteLine(message.Message);
                   break;
               }
           }


           var responseMessage = new TestMessage("And so have I");

           serviceBus.Send(clientQueue, responseMessage);

           Console.WriteLine("Sent Message:" + responseMessage.Id + " at " + DateTime.Now);



           Console.ReadLine();
       }
   }

client_console

 

Fiddler

If you’re curious you can see what’s going on under the covers using Fiddler.

fiddler

 

API Change Requests

Having played around with the API it’s pretty good and much better than the MessageBuffer API which is really awkward.

I’d like to see on the ServiceBusNamespaceClient a method that tells you if a Queue exists or not, something like:

bool QueueExists(string path)

Currently MessagingFactory, ServiceBusNamespaceClient, QueueClient do not implement interfaces which makes it much harder to Unit Test. Creating an abstracting around these classes would be a big win IMO.

 

Conclusion

All in all the Queue API is a much needed and welcomed addition to the Azure offerings and greatly simplifies messaging communication between not always connected clients and the server. It also appears to be very fast as well.

Grab the code.

In my next post I’m going be looking at the Publish/Subscribe (Topics) features.

2 comments:

  1. Hi There, I have created an xml file that I am pushing into a que its not that big, its a 100kb XML file that I convert to a string, but it doesnt make it to the recievers, if I keep the string length lower (sorry not tested how long exactly) then its recieved fine, do you know what the correct way to do this is with a package say 500kb - 1mb message.

    ReplyDelete
  2. All the documentation states that 64KB is the maximum message size. 
    http://msdn.microsoft.com/en-us/library/dd179363.aspx
    If you're trying to pass around messages bigger than this then you would need to use a different  mechanism. Why not pass a lightweight message with a Key that can be used to callback to a Web Service from the client to get the full payload?

    ReplyDelete