Azure Logic Apps, Functions, and Service Bus

Here we are yet again. Me writing something on this blog if for no other reason then to document something I learned. There’s no real narrative behind this one other than I built another POC for a partner and in the process found some things I wanted to pull together.

The story here is about digging beyond the Logic App designer and interacting with Service Bus queues, topics, and event hubs. Access and manipulating the message properties as we start chaining Logic App workflows together with functions and custom code.

Since we’re going beyond what the designer currently supports, we’ll look exclusively at the “code view” for everything.

Sending to a Queue

The first step was to create a workflow that would accept an HTTP request and use that to create a message in an Azure Service Bus queue. In doing this ‘simple’ task I learned two things, how to compose an object and how to set a custom message property.

The compose action allows you to construct a JSON object from various inputs. I wanted to be able to send a message to a queue for further processing as well as to event hub for logging. So being able to compose the object once and reuse it was VERY handy.

 
"ComposeJobMsg": {
   "inputs": {
       "JobID": "@{body('SaveJobtoDatabase')?['OutputParameters']['JobID']}",
       "customer": "@{triggerBody()?['customer']}",
       "job_payload": "@triggerBody()?['job_payload']",
       "job_type": "@{triggerBody()?['job_type']}"
   },
   "runAfter": {
       "SaveJobtoDatabase": [
         "Succeeded"
       ]
   },
   "type": "Compose"
}  

This action takes input from the workflow trigger and the result of a previous stored procedure, SaveJobToDatabase, and constructs a simple JSON object with four properties (with horribly inconsistent naming conventions I know).

With the message object created, I can now send it to a queue, specifying the output of the compose operation as the ContentData for my queue message. The SendToQueue action’s body looks like this:

"body": {
   "ContentData": "@{encodeBase64(string(outputs('ComposeJobMsg')))}",
   "ContentType": "JSON",
   "Properties": {
       "job_type": "@{triggerBody()?['job_type']}"
   }
}

There are a few things going on here I want to point out. We’re taking the output of the compose action, outputs(‘ComposeJobMsg’), and converting it from a JSON object to a string. We then base64 encode that string to ensure it will survive transport through the queue. We’re also starting the ContentData value with ‘@{‘ to designate that we’re using a parameter value and we want to treat it as a string. Using ‘{‘ to inform the Logic App that its a string is unnecessary, but sometimes its nice to err on the side of caution. You can learn more about the use of expressions like ‘@‘ and ‘{‘ in the Workflow Definition Language documentation.

Next up, we make sure to set the ContentType as “JSON”.  And finally I add a custom property, “job_type” and set its value to parameter that was on the workflow trigger (again treating that value as a string).

Queue as a Trigger

This is where things started to get interesting. I created a second workflow that is triggered “when a message is received’ and set it to run at 30 second intervals. But this created a problem with trying to update the workflow. Currently (this is something that’s being worked on), the Logic Apps connector takes advantage of Service Bus Queues’ long polling capabilities. Long polling is great because it helps reduce the latency between when a message arrives and it can be processed. So even though the workflow was set to check every 30 seconds… when a message want sent to the queue, it triggered the workflow almost immediately.

The reason for this is that the workflow is not actually running at 30 second intervals, but instead starts polling the queue and waits for that to time out, then it’ll wait 30 seconds and poll again. Where this creates an issue is that if you are in active development, you’re likely going to be changing the workflow every few minutes. Tweak this… run a test… adjust that, run a test. When the workflow start’s polling, its going to wait about 10 minutes for that operation to time out. So even though the “save” works fine, any changes you’re making won’t take affect until the next time the workflow is triggered (after the long-polling times out).

The recommendation I was given, that worked really great (thanks Jeff), is to change the  interval to something like once a day. Then via the portal, we can use the “run trigger” feature to kick off a one-time run of the workflow. So what I would do is I’d modify the workflow, submit a test message to the queue, then manually trigger it. Admittedly, its not as smooth as I’d like, but it gets the job done. The product team seems aware of this so I’m hopeful this workaround won’t be needed for the long term. Once development is complete, the “production” version of the workflow can use a normal timing setting, as long as we’re aware of and OK with the long polling behavior.

Accessing Queue Message Properties

I wanted to be able to consume events both via a Logic App workflow, as well as from some C# code. The workflow portion would look at the job_type property I set above, and use that in a condition to control routing of the message to another queue. If you’re using the drag/drop workflow designer, its pretty easy to get at the queue message ContentData. If you click on it in the designer, the code behind will insert something like this:

triggerBody()['ContentData']

Something was pointed out to me (thanks again Jeff!) and the lighbulb went off.  Note that ContentData is the exact same property we set when we sent the message to the queue up above. So if we wanted to access the job_type value we set, we simply access the Properties collection like so:

triggerBody()?['Properties']['job_type']

You can’t currently do this via the designer, so you’ll need to flip over to code view if you want to access the individual properties within the Properties collection.

But what if we want at the actual payload of ContentData? The JSON object is there, we just have to reverse what we did when we put it into the message. We’ll use a couple Workflow Definition Language functions undo the base64 encoding and get the string content. That string is JSON, so we use the json method to convert it to an object. Once its an object again, we can then access any properties within it, such as the JobID we set when we composed the original object.

@json(base64toString(triggerBody()['ContentData']))['JobID']

In C#, if we want to get at the contents of our object, we do a similar process to get the body of the BrokeredMessage object, and transform that JSON payload into an object.

// get the message body
var body = message.GetBody<Stream>();
string jsonJob = new StreamReader(body, true).ReadToEnd();

// convert message body to object
dynamic job = JsonConvert.DeserializeObject(jsonJob);

What about Event Hub?

There isn’t a connector for Event Hub (at least as of the authoring of this post). So I created an Azure function (code on github) to do this for me. It accepted a few parameters and put it into the event hub so I could later process them via stream analytics. Calling it from the workflow was then pretty straight forward.

"LogToEventHub": {
   "inputs": {
       "body": {
           "JobID": "@{json(base64toString(triggerBody()['ContentData']))['JobID']}",
           "customer": "@{json(base64toString(triggerBody()['ContentData']))['customer']}",
           "job_payload": "@string(outputs('ComposeLogMsg'))",
           "status": "routing"
       },
       "function": {
           "id": "<insert your function reference here>"
       }
   },
   "runAfter": {
       "ComposeLogMsg": [
           "Succeeded"
       ]
   },
   "type": "Function"
}

Just like when we were sending content to the queue, make sure you know what format the objects should go into event hub should be in. My function is called via HTTP, so parameters of the body need to be strings. I opted to use compose to create the payload, then convert that to a string to be output to the event hub. Make sure you know what you’re passing and how it needs to be done as forgetting the proper ‘{‘ or ‘@’ can cause a real headache.

One final Gotcha, WebJobs

Now this one is a truly personal note. When you add Azure Function to a resource group, its currently a valid target for a VSTS web publish. In fact, if you look at the Function in the portal, you can click an option to view the hosting Web App. Once in that web app, you could see any web jobs you may or may not have accidentally deployed to the wrong location (yeah, it happened). I’ve been told that this will eventually be disabled. But in the interim, I wanted to share this little tidbit so nobody else wastes a late night hour trying to figure out why event messages are being consumed when the web jobs that were processing it all are all stopped (or so you thought).

Lesson learned. 🙂

Until next time!

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: