Long Running Queue Processing (Year of Azure Week 3)

Sitting in Denver International Airport as I write this. I doubt I’ll have time to finish it before I have to board and my portable “Azure Appliance” (what a few of us have started to refer to my 8lb 17” wide-screen laptop as) simply isn’t good for working with on the plane. I had hoped to write this will working in San Diego this week, but simply didn’t have the time.

And no, I wasn’t at Comic Con. I was in town visiting a client for a few days of cloud discovery meetings.  Closest I came was a couple blocks away and bumping into two young girls in my hotel dressed as the “Black Swan” and Snookie from True Blood. But I’m rambling now.. fatigue I guess.

Anyways, a message came across on a DL I’m on about long running processing of Azure Storage queue messages. And while unfortunately there’s no way to renew the “lease” on a message, it did get me to thinking that this week my post would be about one of the many options for processing a long running queue message.

My approach will only work if the long running process is less than the maximum visiblitytimeout value allowed for a queue message. Namely, 2 hours (thanks to Mike Collier for pointing out my typo) Essentially, I’ll spin the message processing off into a background thread and then monitor that thread from the foreground. This approach would allow me to multi-thread processing of my messages (which I won’t demo for this week because of lack of time). It also allows me to monitor the processing more easily since I’m doing it from outside of the process.

Setting up the Azure Storage Queue

Its been a long time since I’ve published an Azure Storage queue sample, and the last time I messed with them I was hand coding against the REST API. This time we’ll use the StorageClient library.

  1. // create storage account and client
  2. var account = CloudStorageAccount.FromConfigurationSetting("AzureStorageAccount");
  3. CloudQueueClient qClient = account.CreateCloudQueueClient();
  4. // create a queue object to use to manipulate the queue
  5. CloudQueue myQueue = qClient.GetQueueReference("tempqueue");
  6. // make sure our queue exists
  7. myQueue.CreateIfNotExist();

Pretty straight forward code here. We get our configuration setting, and create a CloudQueueClient that we’ll use to interact with our Azure Storage service. I think use that client to create a CloudQueue object which we’ll use to interact with our specific queue. Lastly, we create the queue if it doesn’t already exist.

Note: I wish the Azure AppFabric queues had this “Create If Not Exist” method. *sigh*

Inserting some messages

Next up we insert some messages into the queue so we’ll have something to process. That’s even easier….

  1. // insert a few (5) messages
  2. int iMax = 5;
  3. for (int i = 1; i <= iMax; i++)
  4. {
  5.     CloudQueueMessage tmpMsg = new CloudQueueMessage(string.Format("Message {0} of {1}", i, iMax));
  6.     myQueue.AddMessage(tmpMsg);
  7.     Trace.WriteLine("Wrote message to queue.", "Information");
  8. }

I insert 5 messages numbered 1-5 using the CloudQueue AddMessage method. I should really trap for exceptions here, but his works for demonstration purposes. Now for the fun part..

Setting up our background work process

Ok, now we have to setup an object that we can use to run the processing on our message. Stealing some code from the MSDN help files, and making a couple minor mods, we end up with this…

  1. class Work
  2. {
  3.     public void DoWork()
  4.     {
  5.         try
  6.         {
  7.             int i = 0;
  8.  
  9.             while (!_shouldStop && i <= 30)
  10.             {
  11.                 Trace.WriteLine("worker thread: working…");
  12.                 i++;
  13.                 Thread.Sleep(1000); // sleep for 1 sec
  14.             }
  15.             Trace.WriteLine("worker thread: terminating gracefully.");
  16.             isFinished = true;
  17.         }
  18.         catch
  19.         {
  20.             // we should do something here
  21.             isFinished = false;
  22.         }
  23.     }
  24.  
  25.     public CloudQueueMessage Msg;
  26.     public bool isFinished = false;
  27.  
  28.     public void RequestStop()
  29.     {
  30.         _shouldStop = true;
  31.     }
  32.     // Volatile is used as hint to the compiler that this data
  33.     // member will be accessed by multiple threads.
  34.     private volatile bool _shouldStop;
  35. }

The key here is the DoWork method. This one is setup to take 30 seconds to “process” a message. We also have the ability to abort processing using the RequestStop method. Its not ideal but this will get our job done. We really need something more robust in my catch block, but at least we’d catch any errors and indicate processing failed.

Monitoring Processing

Next up, need to launch our processing and monitor it.

  1. // start processing of message
  2. Work workerObject = new Work();
  3. workerObject.Msg = aMsg;
  4. Thread workerThread = new Thread(workerObject.DoWork);
  5. workerThread.Start();
  6.  
  7. while (workerThread.IsAlive)
  8. {
  9.     Thread.Sleep(100);
  10. }
  11.  
  12. if (workerObject.isFinished)
  13.     myQueue.DeleteMessage(aMsg.Id, aMsg.PopReceipt); // I could just use the message, illustraing a point
  14. else
  15. {
  16.     // here, we should check the queue count
  17.     // and move the msg to poison message queue
  18. }

Simply put, we create our worker object, give it a parameter (our message) and create/launch a thread to process the message. We then use a while loop to monitor the thread. When its complete, we check isFinished to see if processing completed successfully.

Next Steps

This example is pretty basic but I hope you get the gist of it. In reality, I’d probably be spinning up multiple workers and throwing them into a collection so that I can then monitor multiple processing threads. As more messages come in, I can put them into collection in place of threads that have finished. I could also have the worker process itself do the deletion if the message was successfully processed. There are lots of options.

But I need to get rolling so we’ll have to call this a day. I’ve posted the code for this sample if you want it, just click the icon below. So enjoy! And if we get the ability to renew the “lease” on a message, I’ll try to swing back by this and put in some updates. Smile

https://skydrive.live.com/embedicon.aspx/.Public/Year%20of%20Azure/YOA%20-%20Week%203.zip?cid=61aea8168d26ea6b&sc=documents

2 Responses to Long Running Queue Processing (Year of Azure Week 3)

  1. Pingback: Windows Azure and Cloud Computing Posts for 7/22/2011+ - Windows Azure Blog

  2. Pingback: Windows Azure and Cloud Computing Posts for 11/21/2011+ - Windows Azure Blog

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.