Azure Logic Apps, Functions, and Service Bus

Here we are yet again. Me writing something on this blog if for no other reason then to document something I learned. There’s no real narrative behind this one other than I built another POC for a partner and in the process found some things I wanted to pull together.

The story here is about digging beyond the Logic App designer and interacting with Service Bus queues, topics, and event hubs. Access and manipulating the message properties as we start chaining Logic App workflows together with functions and custom code.

Since we’re going beyond what the designer currently supports, we’ll look exclusively at the “code view” for everything.

Sending to a Queue

The first step was to create a workflow that would accept an HTTP request and use that to create a message in an Azure Service Bus queue. In doing this ‘simple’ task I learned two things, how to compose an object and how to set a custom message property.

The compose action allows you to construct a JSON object from various inputs. I wanted to be able to send a message to a queue for further processing as well as to event hub for logging. So being able to compose the object once and reuse it was VERY handy.

 
"ComposeJobMsg": {
   "inputs": {
       "JobID": "@{body('SaveJobtoDatabase')?['OutputParameters']['JobID']}",
       "customer": "@{triggerBody()?['customer']}",
       "job_payload": "@triggerBody()?['job_payload']",
       "job_type": "@{triggerBody()?['job_type']}"
   },
   "runAfter": {
       "SaveJobtoDatabase": [
         "Succeeded"
       ]
   },
   "type": "Compose"
}  

This action takes input from the workflow trigger and the result of a previous stored procedure, SaveJobToDatabase, and constructs a simple JSON object with four properties (with horribly inconsistent naming conventions I know).

With the message object created, I can now send it to a queue, specifying the output of the compose operation as the ContentData for my queue message. The SendToQueue action’s body looks like this:

"body": {
   "ContentData": "@{encodeBase64(string(outputs('ComposeJobMsg')))}",
   "ContentType": "JSON",
   "Properties": {
       "job_type": "@{triggerBody()?['job_type']}"
   }
}

There are a few things going on here I want to point out. We’re taking the output of the compose action, outputs(‘ComposeJobMsg’), and converting it from a JSON object to a string. We then base64 encode that string to ensure it will survive transport through the queue. We’re also starting the ContentData value with ‘@{‘ to designate that we’re using a parameter value and we want to treat it as a string. Using ‘{‘ to inform the Logic App that its a string is unnecessary, but sometimes its nice to err on the side of caution. You can learn more about the use of expressions like ‘@‘ and ‘{‘ in the Workflow Definition Language documentation.

Next up, we make sure to set the ContentType as “JSON”.  And finally I add a custom property, “job_type” and set its value to parameter that was on the workflow trigger (again treating that value as a string).

Queue as a Trigger

This is where things started to get interesting. I created a second workflow that is triggered “when a message is received’ and set it to run at 30 second intervals. But this created a problem with trying to update the workflow. Currently (this is something that’s being worked on), the Logic Apps connector takes advantage of Service Bus Queues’ long polling capabilities. Long polling is great because it helps reduce the latency between when a message arrives and it can be processed. So even though the workflow was set to check every 30 seconds… when a message want sent to the queue, it triggered the workflow almost immediately.

The reason for this is that the workflow is not actually running at 30 second intervals, but instead starts polling the queue and waits for that to time out, then it’ll wait 30 seconds and poll again. Where this creates an issue is that if you are in active development, you’re likely going to be changing the workflow every few minutes. Tweak this… run a test… adjust that, run a test. When the workflow start’s polling, its going to wait about 10 minutes for that operation to time out. So even though the “save” works fine, any changes you’re making won’t take affect until the next time the workflow is triggered (after the long-polling times out).

The recommendation I was given, that worked really great (thanks Jeff), is to change the  interval to something like once a day. Then via the portal, we can use the “run trigger” feature to kick off a one-time run of the workflow. So what I would do is I’d modify the workflow, submit a test message to the queue, then manually trigger it. Admittedly, its not as smooth as I’d like, but it gets the job done. The product team seems aware of this so I’m hopeful this workaround won’t be needed for the long term. Once development is complete, the “production” version of the workflow can use a normal timing setting, as long as we’re aware of and OK with the long polling behavior.

Accessing Queue Message Properties

I wanted to be able to consume events both via a Logic App workflow, as well as from some C# code. The workflow portion would look at the job_type property I set above, and use that in a condition to control routing of the message to another queue. If you’re using the drag/drop workflow designer, its pretty easy to get at the queue message ContentData. If you click on it in the designer, the code behind will insert something like this:

triggerBody()['ContentData']

Something was pointed out to me (thanks again Jeff!) and the lighbulb went off.  Note that ContentData is the exact same property we set when we sent the message to the queue up above. So if we wanted to access the job_type value we set, we simply access the Properties collection like so:

triggerBody()?['Properties']['job_type']

You can’t currently do this via the designer, so you’ll need to flip over to code view if you want to access the individual properties within the Properties collection.

But what if we want at the actual payload of ContentData? The JSON object is there, we just have to reverse what we did when we put it into the message. We’ll use a couple Workflow Definition Language functions undo the base64 encoding and get the string content. That string is JSON, so we use the json method to convert it to an object. Once its an object again, we can then access any properties within it, such as the JobID we set when we composed the original object.

@json(base64toString(triggerBody()['ContentData']))['JobID']

In C#, if we want to get at the contents of our object, we do a similar process to get the body of the BrokeredMessage object, and transform that JSON payload into an object.

// get the message body
var body = message.GetBody<Stream>();
string jsonJob = new StreamReader(body, true).ReadToEnd();

// convert message body to object
dynamic job = JsonConvert.DeserializeObject(jsonJob);

What about Event Hub?

There isn’t a connector for Event Hub (at least as of the authoring of this post). So I created an Azure function (code on github) to do this for me. It accepted a few parameters and put it into the event hub so I could later process them via stream analytics. Calling it from the workflow was then pretty straight forward.

"LogToEventHub": {
   "inputs": {
       "body": {
           "JobID": "@{json(base64toString(triggerBody()['ContentData']))['JobID']}",
           "customer": "@{json(base64toString(triggerBody()['ContentData']))['customer']}",
           "job_payload": "@string(outputs('ComposeLogMsg'))",
           "status": "routing"
       },
       "function": {
           "id": "<insert your function reference here>"
       }
   },
   "runAfter": {
       "ComposeLogMsg": [
           "Succeeded"
       ]
   },
   "type": "Function"
}

Just like when we were sending content to the queue, make sure you know what format the objects should go into event hub should be in. My function is called via HTTP, so parameters of the body need to be strings. I opted to use compose to create the payload, then convert that to a string to be output to the event hub. Make sure you know what you’re passing and how it needs to be done as forgetting the proper ‘{‘ or ‘@’ can cause a real headache.

One final Gotcha, WebJobs

Now this one is a truly personal note. When you add Azure Function to a resource group, its currently a valid target for a VSTS web publish. In fact, if you look at the Function in the portal, you can click an option to view the hosting Web App. Once in that web app, you could see any web jobs you may or may not have accidentally deployed to the wrong location (yeah, it happened). I’ve been told that this will eventually be disabled. But in the interim, I wanted to share this little tidbit so nobody else wastes a late night hour trying to figure out why event messages are being consumed when the web jobs that were processing it all are all stopped (or so you thought).

Lesson learned. 🙂

Until next time!

Advertisements

Extending Logic Apps with Azure Functions

Note: This article is based on services that were in preview at the time it was written. The user experience and any issues/challenges mentioned below may differ from what was eventually released for general availability.

I remember when Azure was only three services. Technically, it was four, but let’s not mince details. Today, there are dozens of services, each with a multitude of features. So it’s impossible to be an expert in them all. As a technology specialist, an architect, I try to understand the basics of most of them, but only really go deep in areas where I have a specific interest. But from time to time, the partners I work with have asks that require me to go into areas I would have otherwise skimmed over.

One such request was to help a partner solve a challenge they were facing with correlating transactions being sent to them by an upstream solution provider they were working with. A simple enough request on the surface, but in this case my partner also had a short run-way to implement this. As such, they wanted to avoid having to do a large amount of coding. They wanted to embrace the speed and flexibility that PaaS gives you.

They looked at Logic Apps, a spiritual successor to BizTalk orchestrations. Unfortunately, Logic Apps doesn’t currently have “message box” type of functionality that would allow for the correlation of multiple messages. However, Logic Apps do have the ability to integrate with another service, Azure Functions, which allows us to extend the features of Logic Apps through custom code.

So I set about creating a simple proof of concept to prove out how something like this could work.

Overview of the solution

The workflow would happen in two parts. The first step would be correlate the two inbound transactions we’d receive that together comprise a “complete order”. A second workflow would then be triggered to monitor an installation process that would occur.

For the first workflow, I wanted to keep it simple. My POC used two transactions that were the same format, each containing a customer name, and a product type. The workflow would receive them via a REST API call. Each transaction would then be written to an Azure SQL DB and the workflow would respond back to the requestor with a “success” response code (200). After this, the workflow would call out to an Azure function to determine if the transaction was complete. If it was, we would drop a message into a queue that would trigger the second half of the workflow. If it wasn’t, we’d just end there and wait for the next transaction to come in. Here’s what the first workflow would look like in Azure Logic Apps.

image

The second part we’ll discuss later. But these simple steps would give me my “message box”.

Now this article won’t cover getting started with Logic Apps or Functions. There’s enough IMHO written on these subjects already that I don’t feel I have anything new to add. So if you haven’t work with these before, I highly recommend at least covering their “getting started” materials.

Triggering the workflow

Logic apps gives you a multitude of “triggers” that can be used to start a workflow. For this proof of concept, I opted to trigger the workflow manually when an HTTP request is received. Since it’s just a POC I don’t bother to secure this endpoint. This also means I can easily test my workflow using a tool like Fiddler or Postman.

I do this using a “manual trigger”, specifically the “Request – When an HTTP request is received”. You’ll find this under the “Show Microsoft Managed APIs” list. I also keep the request body simple, just a string that is the “customer” identifier, and another that is the “product” identifier. In the Logic App, the request body schema looks like this:

{
    "properties": {
        "customer": {
            "type": "string"
        },
        "product": {
            "type": "string"
        }
    },
    "title": "Product Order",
    "type": "object"
}

So its a simple JSON object called “Product Order” that contains two sting properties, “customer” and “product”. I don’t think it could get much simpler.

Saving the transaction to our “message box”

So the first step in re-creating BizTalk’s functionality is having a place to store the messages. Logic Apps has the ability to perform actions on Azure SQL DB, so I opted to leverage this connector. So I created a database and put a table in it that looked like this:

RowID – a GUID that has a default value of “newid()”

Customer – a string

Product – a string

Complete – boolean value, defaults too false

There are several SQL Connectors available in Logic Apps, including an “insert row” action. However, I struggled to get it to work. I wanted to use the defaults I had set up in the database for RowID and Complete fields. And the insert row action told me I had to specify the values for all columns in the table. I opted instead to create a stored procedure in the database, and use the “Execute stored procedure” connector. The stored procedure accepts two parameters, the Customer and Product strings from the HTTP request body.

With the message safely saved in the database, we now respond back to the request letting them know we have received and processed it. This is done via the “Response” action. I could have done this immediately after the request was received, but I wanted to make sure to wait until after I had saved the message. This way I’m really indicating to the requestor that I’ve received AND processed their request.

Correlating the orders

With the orders now saved, I can begin the process of adding my custom business logic to correlate the orders. I created an Azure Function app, and defined a new function named “CheckOrderComplete”. This will be a C# based function triggered (again) by an HTTP trigger. I choose C# because the partner I’m working with does much of their work in C#, so it was a good fit. The HTTP trigger made sense since we’re already using HTTP operations for the workflow trigger. Why not remain consistent. The objective of the function would be to query the database and see of I had the two transactions I needed to have a “complete” transaction on my end.

Azure Functions provides a C# developer reference that was really helpful. However, it still took a bit of trial and error my first time. So I’m going to try and break down my full down my full csx file into the various changes I made. The first change was that I needed to make sure I referenced the assembly that would allow me to interact with the SQL DB where the requests had been stored.

   
#r "Newtonsoft.Json"
#r "System.Data"   

The Newtonsoft line was already there since I’m dealing with an HTTP request that would need to have its JSON payload translated into C#. But I had to reference the external System.Data assembly, so I added the second line so I’d have the SQL Client. The #r is like adding a reference in a Visual Studio project. This assembly is already available in the Azure Functions hosting environment (think of it as the Nuget package already being installed), so no other action was necessary to make it available for my use.

Next up, I had to add a “using” clause to the code so I could leverage the assembly I just referenced. This works just like it would in a Visual Studio project.

using System.Data.SqlClient;   

This function is called via an unsecured (it is only a POC after all) web hook. So the parameters are going to come to use as a json object that I need to deserialize and validated…

string jsonContent = await req.Content.ReadAsStringAsync();
dynamic data = JsonConvert.DeserializeObject(jsonContent);
  
if (data.customer == null || data.product == null) {
    return req.CreateResponse(HttpStatusCode.BadRequest, new {
        error = "Please pass customer/product properties in the input object"
    });
} 

And now that I know my customer and product parameters are both here, I can use those to check the database. This looks like any other SQL command I’d execute.

  
SqlConnection sqlConnection1 = new SqlConnection("<em>{your connection string}</em>");
SqlCommand cmd = new SqlCommand();

cmd.CommandText = "select count(Distinct Product) from Orders where Customer = '" + data.customer + "' AND complete = 0";  
cmd.CommandType = System.Data.CommandType.Text;
cmd.Connection = sqlConnection1;

sqlConnection1.Open();
  
int productCnt;  
int.TryParse(cmd.ExecuteScalar().ToString(), out productCnt);
log.Info("Product Count for Customer '" + data.customer + "' is " + productCnt.ToString());
  
sqlConnection1.Close();    

I took the easy route and hard-coded my connection string since this is a POC. The function will run in an App Service, so I can set application environment variables and store it there. This would definitely be the preferred approach for production code.

With the sql check complete, I have a count of the number of orders for the customer that have not been completed. I’m expecting at least 2. So we can now check the count and respond to the caller with a succeed or fail.

if (productCnt > 1) {
     return req.CreateResponse(HttpStatusCode.OK, new {
         greeting = $"Customer Order is complete!"
     }); 
} else {
     return req.CreateResponse(HttpStatusCode.BadRequest, new {
         greeting = $"Order Incomplete!"
     }); 
}

This check is pretty basic I’ll admit. But for my POC is does the job. I retrieves a list of orders (yeah, imbedded SQL… injection worries… I know) where I have at least two orders that are not complete for a given customer. The real example would be far more robust and also (I hope) more secure. You could also let the function perform additional operations such as updating both items as complete. Its up to you.

With the function created (and tested), we can then connect it to the Logic App workflow. The two product teams have made this really simple. With that action complete, we then add a condition check using the Status Code that was returned from my function. If its equal to 200, we drop a message into a queue that another workflow will pick up to complete processing of the order.

clip_image002

Long Running Workflows

I mentioned above that there would be a second workflow. This second part may take minutes, hours, or even days/weeks to complete. Having a single workflow run that long it problematic. It could loose state in mid process and a host of other problems. So I wanted to avoid that.

The plan here is that when you send the final event message in our first workflow, in this case a Service Bus queue, you can set optional parameters. Items like ScheduledEnqueueTimeUTC which allows you to send the message now, but have it not be visible until perhaps an hour in the future. Then the second workflow would be triggered by the receipt of this event message. That workflow can then check to see if the process has been completed (perhaps using another Azure Function), and when complete dropping a message into yet another queue to signal completion. If its not yet complete, it drops the message back into the queue again with a scheduled enqueue time again set in the future.

This allows that workflow, which could take weeks, ensure that its “state” is maintained, even if the workflow itself needs to restart.

Summary

So it may not seem like much. But I was pretty excited to find a way to accomplish what my partner was after in only 34 lines of code (once you remove all the wrapper stuff). And they were pleased with the end product.

Admittedly, as I write this, Logic Apps and Functions are both still in preview. And there have some rough edges. But there’s a significant amount of potential to be leveraged here so I have little doubt that they will be cleaning those edges up. And should you want to build this yourself, I’ve added some of the code to my personal github repository.

Enjoy, and until next time!