Azure’s new Event Hub

Over the past few months, I had the good fortune to be accepted to present at ThatConference in Wisconsin and CloudDevelop in Ohio. I count myself even more fortunate because at the time I submitted my session for both these events, it was about a new Azure solution that hadn’t even been announced yet, the Event Hub.

Whenever possible, I like to put demos into a real world context. For this one, I reached out to two colleagues that were presenting at ThatConference and collectively we came up with the idea to do a conference attendee tracking solution. For my part of this series, I was going to cover using Event Hub to ingest event messages from various sources (social media, mobile apps, and proximity sensors) and feeding those into the hub. I also wrote some code so that the other sessions could consume the messages.

Event Hub vs. Topics/Queues

The first question to get out of the way is that Event Hub is NOT just a new variation on Topics/Queues. For this, I’ve found a simple visual example works best.

This is topics/queues

This is Event Hub

 

The key differentiator between the two is scale. A courier can pick up a selection of packages, and ensure they are delivered. But if you need to move hundreds of thousands of packages, you can do that with a lot of couriers, or you could build a distribution center capable of handling that kind of volume more quickly. Event Hub is that distribution center. But it’s been built as a managed service so you don’t have to build your own expensive facility. You can just leverage the one we’ve built for you.

In Service Bus, topics and queues are about the transportation and delivery of a specific payload (the brokered message) from point A to point B. These come with specific features (guaranteed delivery, visibility controls, etc…) that also limit the scale at which a single queue can operate. Service Bus was built to solve the challenges of scaled ingestion of messages, but did so with the trade-off of these types of atomic operations. The easiest way to think of Event Hub is as a giant buffer into which you place messages, and they are automatically retained for a given period of time. You then have the ability to read those messages much as you would read a file stream from disk. You can even rewind all the way back to the beginning of the stream and process everything again.

And as you might expect given the different focus of the two solutions, the programming models are also different. So it’s also important to understand that switching from one to the other isn’t simply a matter of switching the SDK.

What is the Event Hub?

If you think back to Topics/Queues, you had the option of enabling partitions via the EnablePartioning property. This would cause the topic or queue to switch from a single event hub broker (the service side edge compute node), to 16 brokers, increasing the total throughput of the queue by 16 times. We call this approach, partitioning. And this is exactly what Event Hub does.

When you create an Event Hub, you determine the number of partitions that you want (from 16, the default, up to 1024). This allows you to scale out the processes that need to consume events. Partitions are also used to help direct messages. When you send an event to the hub, you can assign a partition key which is in turn hashed by the Event hub brokers so that it lands in a given partition. This hash ensures that as long as the same partition key is used, the events will be placed into the same partition and in turn will be picked up by the same reciever. If you fail to specify a partition, the events will be distributed randomly.

When it comes to throughput, this isn’t the end of the story. Event Hubs also have “throughput units”. By default you start with a single throughput unit that allows 1mb/s in and 2mb/s out through your hubs. You can request this to get scaled up to 1000 throughput units. When you purchase a throughput unit, you do this at the namespace level since it applies to all your event hubs in that namespace.

So what we have is a service that can scale to handle massive ingestion of events, combined with a huge buffer just in case the back end, which also features scalable consumption, can’t keep up with the rate in which messages are being sent. This gives us scale on multiple facets, as a managed, hosted service.

So about that presentation…

So the next obvious question is, “how does it work?” This is where my demos came in. I wanted to show using event hug to consume events from multiple sources: a social media feed, a mobile app used by vendors to scan attendee badges, and proximity sensors scattered around the conference to help measure session attendance.

I started by realizing that when I consume event, I needed to know what type they were (aka how to deserialize them). To make this easy, I started by defining my own customer, .NET message types. I selected twitter for the media feed and for the messages, the type class declaration looks like this:

So we have who tweeted, the text of the message, and when it was created. I decorated the class with various data attributes to aid in serialization.

When a tweet is found, we’ll need a client to send the event…

This creates an EventHubClient object, using a connection string from the configuration file, and a configuration setting that defines the name of the hub I want to send to.

Next up, we need to create my event object, and serialize it.

I opted to use Newtonsoft’s .NET JSON serializer. It was already brought in by the Event Hub nuget package. JSON is lighter weight then XML, and since Event Hub is based on total throughput, not # of messages, it made sense to keep the payloads as small as was convenient.

Finally, I have to actually send the message:

I create an instance of the EventData object using the serialized event, and assign a partition key to it. Furthermore, I also add a custom property to it that my event processors can then use to determine how to deserialize the event. Finally, I call the EventHubClient method, Send, handing my event as a property. The default way for the .NET client to do all this is to use AMQP 1.0 under the covers. But you can also do this via REST from just about any language you want. Here’s an example using Javascript…

This example comes from another part of the presentation’s demo, where I use a web page with imbedded javascript to simulate the vendor mobile device app for scanning attendee badges. In this case, the Partition key is in the URI and is set to “vendor”. While I’m still sending a JSON payload, this one uses a UTF-8 encoding instead of Unicode. Another reason it could be important that we have an event type when we’re consuming events.

Now you’ll recall I explained that the partition key is used to help distribute the events so that we end up with a fairly even distribution amoung the consuming processes. So why would I select to bind each of my examples to a single partition? In my case, I knew that volumes would be low, so there wasn’t much of an issue with overloading my consuming processes. But you can also use this approach if you want to ensure that the same consuming process always gets the events from the same source. Something that can be really handy if the consuming process is using the events to maintain an in-memory state model of some type.

So what about consuming the events?

Events are consumed via “consumer groups”. Each group can track its position within the overall event hub ‘stream’ separately, allowing for parallel consumption of events. A default group is created when the event hub is created, but we can create our own. Consuming processes in turn create receivers, which connect to the various partitions to actually consume the events. This would normally require you to code up some rather complicated logic to ensure that if the process that owns a given set of receivers becomes unavailable, another process can pick up the slack. Fortunately, the event hub team thought of this already and created another nuget package called the EventProcessorHost.

Simply put, this is a handy, .NET based approach to handle resiliency and fault tolerance of event consumers/recievers. It uses Azure Storage blobs to track which receivers are attached to a given partition in an event hub. If you add or remove consuming processes, it will redistribute the receivers accordingly. I used this approach for my presentation to create a simple console app that displays the events coming into the hub. There’s really just three parts of the solution: the program itself, a receiver class, and an event processor class.

The console program is the simplest bit of code…

We use the namespace manager to create a consumer group if the one we want doesn’t already exist. Then we instantiate a Receiver object, and tell that object to start processing events, distributing threads across the various partitions in the event hub. The nuget package comes with its own Receiver class, so there’s not much you really need to do. The core of the receiver is in the MessageProcessingWithPartitionDistribution method.

You’ll note that this may actually be a bit different then the version that arrives with the nuget package. This is because I’ve modified it to use a consumer group name I specify, instead of just the default name. Otherwise, it’s the same example. I get the Azure Storage connection string (where the blobs that will control our leases will go), and then uses that to create an EventProcessorHost object. We then tell the host to start doing asynchronous event processing (via RegisterEventProcessorAsync). This registration, actually points to our third class, which implements the IEventProcessor interface. Again a template is provided as part of the nuget package, so we don’t have to write the entire thing ourselves. But if you look at this ProcessEventsAsync method, we see the heart of it…

What’s happening behinds the scenes is that a thread is being spun up for each partition on the Event Hub. This thread then uses a blob lease to take ownership of a partition, then attached a receiver and begins consuming the events. Each time it pulls events (by default, it will pull 10 at a time), the method I show above is called. This method just loops through the collection of events, and every minute will tell the EventProcessorHost to checkpoint (record were we’re at in the stream) our progress. Inside of the foreach loop is the code that looks at the events, deserializes appropriately, and displays then on the programs console.

You can see we’re checking the events “Type” property, and then deseralizing it back into an object with the proper type of encoding. It’s a simple example, but I think drives the point home.

We can see some of what’s going on under the covers of the processor by looking at the blob storage account we’ve associated with our processor. First up, the EventProcessor creates a container in the storage account that is named the same as our event hub (so if you have multiple hubs with the same name in different namespaces, be sure to use different storage accounts). Within that container is a blob named “evenhub.info” which contains a JSON payload that describes the container and the hub.

{“EventHubPath”:”defaulthub”,”CreatedAt”:”2014-10-16T20:45:16.36″,”PartitionCount”:16}

This shows the location of the hub, when this container/file was created, and the number of partitions in the hub. Getting the number of partitions is why you must use a connection string or SAS for the hub that has manager permissions. Also within this container is one blob (zero indexed) for each partition in the hub. These blobs also contain JSON payloads.

{“PartitionId”:”0″,”Owner”:”singleworker”,”Token”:”642cbd55-6723-47ca-85ee-401bf9b2bbea”,”Epoch”:1,”Offset”:”-1″}

We have the partition this file is for, the owner (aka the EventProcessorHost name we gave this), A token (presumably for the lease), an Epoch (not sure what this is for YET), and an Offset. This last value is the position we’re at in the stream. When you call the CheckPointAsync method of our SimpleEventProcessor, this will update the value of the offset so we don’t read old message again.

Now if we spin up two copies of this application, after a minute or so, you’d see the two change ownership of various partitions. Messages start appearing in both and providing you’re spreading your messages over enough partitions, you’ll even be able to see the partition keys at work as different clients will get messages from specific partitions.

Ok, but what about the presentations?

Now when I started this post, I mentioned that there was a presentation and set of demos to go along with this. I’ve upload both for you to take away and use as you see fit. So enjoy!

Annotated Event Hub PowerPoint Presentation Deck && Event Hub Visual Studio 2013 Demo Solution (contains 3 demos and 5 projects)

Until next time!

Shared Access Signatures with Azure Service Bus

Sorry for the silence folks. Microsoft’s fiscal year end was over June 31st and I started on a new team on July 1st. While July and August are usually great periods for folks like me to get some extra blogging done, I’ve had a few distractions that kept me from writing. Namely learning, learning, learning and trying to find my “voice” on the new team.

Going forward, I’m going to start having more of a focus on this “internet of things” fad that everyone’s talking about. And within that, I’m going to be sticking fairly close to home and working on the services side of things. Even more tightly focused, I’m going to go deeper on “build” vs “buy” scenarios and focus a goodly amount of my available time on one of the Azure product collections I’ve often felt didn’t get enough respect… Service Bus.

So in the coming weeks expect to see blog posts on Event Hub, possibly Hybrid Connections, and for starters, setting a few things straight about Service Bus in general.

SAS vs. ACS

So my first starting point is to call out a ‘stealth update’ that happened recently. As of sometime on/after August 21st 2014, if you create a new Service Bus root namespace via the Azure Management Portal, it will no longer include the associated Access Control Service namespace. This change is in following with recommended guidance the product team has been saying for some time. Namely to use SAS over ACS whenever possible.

Note: You can still generate the associated ACS namespace automatically by using the new-azuresbnamespace powershell cmdlet. Just be aware that a new version of this will be coming soon that will match the default behavior of the management portal. When the new cmdlet is available, you will need to append the “-useAcs true” parameter to the command if you still want to create the ACS namespace.

There are a few reasons for this guidance. The first is that according to the data the team had available to them, many folks doing ACS authentication were using the “owner” credential. This identity had FULL control over the namespace. So using it was akin to having an app use the SA (system administrator) identity for a SQL database. Secondly, ACS requires two calls for the first time operation: one to get the ACS token, one to perform the requested service bus operation. Now the token had a default time to life of 3 hours, but some SDKs didn’t cache the token and as a result all operations against the SB were generating two calls which increases the latency of the operation significantly. As if these weren’t enough, ACS only supports about 40 authentications per second. So if your SDK didn’t cache the token, your possible throughput on a single queue goes from somewhere near 2,000 messages a second down to 40 at best.

Now ACS has some benefits to be sure. In general, folks are much more familiar with username/password models then shared access signatures. You could create identities for specific publishers/consumers (within reason), as well as scope those identities and their permissions to specific paths. Allowing a single identity to send/receive from multiple queues for example. It also had the ACS management namespace with a GUI to help manage things. And to shut down access, all one has to do is revoke the identity and access is cut off.But many of these needs can also be met using Shared Access Signatures if one knows how. And that is what I’d like to start helping you with in this post. J

Shared Access Policies/Rules & Connection Information

Ok, first issue… If you use the management portal, you’ll see the ability to create/manage Shared Access Policies, but in the SDK and API, these are referred to as a SharedAccessAuthorizationRule. For the sake of simplicity, and consistency with Azure storage, I’ll refer to this from now on simply as “policies” (which matches the Azure Storage naming).

In Service Bus terms, a policy (aka SharedAccessAuthorizationRule) is a named set of permissions associated with an entity. The entity can be the Service Bus’ root namespace (the name you gave it when it was created), a queue, a topic, or an event hub. Basically an addressable endpoint that has a name assigned to it. For each entity you can have up to twelve policies and each policy is allowed a mix of the same three permissions: manage, send, and listen. Each policy also has two access keys much like Azure Storage and for the same reason. So you can do key swaps periodically and ensure you always have at least one active, valid key available to your applications when an old one is regenerated.

It’s these policies that are the “Connection Information” you access via the portal and see available as SAS connection strings. And it’s the connection strings that lead me to a bit of an issue I have with how many service bus demos are done.

Service Bus Clients

When you create your first service bus project using the .NET SDK and one of the tutorials, you’ll likely be asked at some point to add code that looks like the following:

// Create EventHubClient
QueueClient client = QueueClient.Create(“vendor-queue2”);

// insert the message body into the request
BrokeredMessage message = new BrokeredMessage(“hello world!”);

// execute the request and get the response 
client.Send(message);

Notice that the sample specifies a queue name that we want a client for, but no credentials. That’s because within the SDK, this method is overloaded to look for an application configuration setting by the name of “Microsoft.ServiceBus.ConnectionString”. The value of this string is the SAS connection string you can get from the portal. It gives the application access to the entity until such time as the policy/rule is removed. In other words, you can re-write this code to look like this:

// Create EventHubClient
QueueClient client = QueueClient.CreateFromConnectionString("Endpoint=sb://<namespace>/;SharedAccessKeyName=<SharedAccessRuleName>;SharedAccessKey=<RuleKey>""vendor-queue2");

// insert the message body into the request
BrokeredMessage message = new BrokeredMessage("hello world!");

// execute the request and get the response
client.Send(message);

By using CreateFromConnectionString in place of the simple Create, we can specify our own connection string. But again, this is permanent access until the policy/rule is removed. It also highlights the issue I have with the way the available samples/demos work I mentioned earlier. I bemoaned the use of the “owner” credential when doing ACS. Lets look at the default connection string that the Service Bus Nuget package puts into the application configuration file:

    <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://[your namespace].servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[your secret]" />

This sample refers to a policy/rule named “RootManageSharedAccessKey”. When you create a new namespace, this default policy has been created for you automatically with permission to listen, send, AND MANAGE the entire namespace. Please! For the love of all that is digital, NEVER… EVER use this credential for anything other than an application that needs to manage all aspects of a given namespace! Instead, configure your own policies with just the permissions that are needed. If you put the default policy into all your apps, we’re right back to the “owner” credential situation we had with ACS.

There’s another issue with this approach. Rules/policies must be associated with a specific service bus entity. This is where SAS comes into play.

Our First SAS

From the root namespace, entities in service bus are accessible by a path that lies directly under that namespace. Together with the root namespace, this can be represented by a path. Like so…

brentsample-ns.servicebus.windows.net/my-queue
brentsample-ns.servicebus.windows.net/my-eventhub
brentsample-ns.servicebus.windows.net/your-topic

Now I could create a policy at the root namespace that has “send” permission. But using it as a connection string would give the sender access to send to everything in the namespace. Alternatively, I could create individual policies/rules on each of these entities. But then I need to juggle all those different connection strings.

If we opt to use a SAS, we have a simpler way to help restrict access, but also make management a touch easier by creating signatures that allow access to a partial path, namely something like entities that begin with “my-“. Unfortunately, the management portal does yet provide the ability to create a SAS. So we either need to write a bit of .NET code, or leverage 3rd party utilities. Fortunately the code is pretty simple. Using Visual Studio, you can create a new Console Application and then add the Nuget package for Azure Service Bus. Then all that remains is to populate some variables and use these two lines of code to generate our signature.

var serviceUri = ServiceBusEnvironment.CreateServiceUri("https", sbNamespace, sbPath).ToString().Trim('/');
string generatedSaS = SharedAccessSignatureTokenProvider.GetSharedAccessSignature(sbPolicy, sbKey, serviceUri, expiry);

The important variables in here are:

sbNamespace – the name of the service bus namespace. Don’t include the “.servicebus.windows.net” stuff. Just the name when we created it.
sbPath – the name, or partial name of the entities we want to access. For this example, let’s pretend its “my-”
sbPolicy – this is the rule/policy that has the permissions we want to the signature to include
sbKey – one of the two secret keys of the rule/policy we’re referencing
expiry – a date/time of when the signature should expire.

If we fill these in, we get a signature that looks something like:

SharedAccessSignature sr=https%3a%2f%2fbmssample-ns.servicebus.windows.net%2fmy-&sig=B9cy8OEuxum2UN5VjsC4JPhbVU7jwJi%2bq20qiaXk24s%3d&se=64953912194&skn=Publish

Now that we have this signature, we want to be able to use it to interact with one of our entities. There’s no “CreateFromSAS” option, but fortunately in the .NET SDK we can use this signature together with a MessagingFactory to create our entity client.

MessagingFactorySettings mfSettings = new MessagingFactorySettings();
mfSettings.TransportType = TransportType.NetMessaging;
mfSettings.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("<signature we just created>");
MessagingFactory mf = MessagingFactory.Create("sb://thatconference-ns.servicebus.windows.net", mfSettings);

// Create Client
QueueClient client = mf.CreateQueueClient(queueName);

And in this case, the same signature will work for the queue ‘my-queue’, or the event hub ‘my-eventhub’ (although for the later the TransportType needs to be Amqp). We can even take this same signature, and put into into javascript for use perhaps in a NodeJs application…

var xmlHttpRequest = new XMLHttpRequest();
xmlHttpRequest.open('POST''https://thatconference-ns.servicebus.windows.net/my-eventhub/publishers/vendorA-DeviceIDnnn/messages'true);
xmlHttpRequest.setRequestHeader('Content-Type''application/atom+xml;type=entry;charset=utf-8');
xmlHttpRequest.setRequestHeader('Authorization''<SharedAccessSignature>');

In the case of event hub, were we’ll have various publishers, we can do exactly the same but using a rule/policy from the event hub and generate a signature that goes deeper like “my-eventhub/publishers/vendorA-“.

Policy Expiry and Renewal

So here’s the $50,000 question. With ACS I could (at least to a certain scale), have a single identity for each client and remove them at any time. With SAS, I can remove a signature by removing its underlying policy at any time. But since I’m limited to twelve policies. How to I recreate the ability to revoke on demand. Simply put, you don’t.

Now before you throw your hands up and state with exasperation that this approach “simply won’t work”, I do ask you to take a long hard look at this requirement. In my experience, I find that the vast majority of the time, allowing someone to publish to an entity is a matter of trust. You’ve entered into an agreement with them and aren’t likely to revoke that access on a whim. The nature of this trust is rarely volatile/short term in nature. If it’s a customer, they are usually engaged for the term of their subscription (monthly or annual are common). So you know when your trust will expire and need to be renewed.

In this situation, were are planning for an eventuality that rarely comes to pass. And one that has an alternative that requires just a small amount of work, implementing your own “credential broker”.

If we look back at what the ACS did, you present it with credentials, and it issued you a token. You then used that token to access the appropriate service bus resources. That’s exactly what our credential broker would do. Periodically, the requesting client would call to a service you are hosting and present some credentials (username/pass, certificate, PSK, etc…). Your broker service validates this request (and likely logs it), then issues back a SAS ‘token’ with an appropriate expiry. The client then (hopefully) caches the SAS ‘token’, and uses it on subsequent requests until it expires and then goes back to the broker to get a new SAS ‘token’.

Now if this isn’t  enough, we still have the ability to remove/disable the queue (or event hub). So in a way we get the best of both worlds. We have automatic expiry of tokens to ensure “key rotation” while also having the ability revoke access immediately.

This is just one possible pattern. So instead of offering up alternatives, I’d love to hear from any of you via the comments on the patterns you have used to help manage shared access signatures.

Expiration Reached

In the coming weeks/months I’m going to generate a series of posts that will dive into various Service Bus related topics more deeply. If there’s something specific you’d like to see, please let me know. Otherwise you can look forward to posts on access service bus from other languages/sdks, programmatic management of namespaces/rules, and resilient architectures around Service Bus.

I hope this article has helped clear up some of the fog around the Azure Service Bus. So until next time!

Azure File Depot

It’s been a little while since my last update. BUILD 2014 has come and gone and my group, DPE (Developer Platform Evangelism), has been re-branded to DX (Developer Experience). Admittedly a name is a name, but this change comes at a time when my role is beginning to transform. For the last 18 months, my primary task has been to help organizations be successful in adopting the Azure platform. Going forward, that will still remain significant, but now we’re also tasked with helping demonstrate our technology and our skills through the creation of intellectual property (frameworks, code samples, etc…) that will assist customers. So less PowerPoint (YEAH!), more coding (DOUBLE YEAH!).

To that end, I’ve opted to tackle a fairly straightforward task that’s come up often. Namely the need to move files from one place to another. It’s come up at least 3-4 times over the last few months so it seems like a good first project under our changing direction. To that end, I’d like to present you to my first open sourced effort, the Azure File Depot.

What is it?

In short, the Azure File Depot is an effort to provide sample implementations of various tasks related to using blob storage to move files around. It contains a series of simple classes that help demonstrate common tasks (such as creating a Shared Access Signature for a blob) and business challenges (the automated publication of files from on-premises to Azure and/or an Azure hosted VM).

Over time, it’s my hope that we may attract a community around this and evolve this little project into a true framework. But in the meantime, it’s a place for me to do what I do best and that’s take learnings and share them. At least I hope I’m good at it since it’s a significant portion of my job. J

What isn’t it?

I need to stress that this project isn’t intended to be a production ready framework for solving common problems. The goal here is to create a collection of reference implementations that address some common challenges. While we do want to demonstrate solid coding practices, there will be places where the samples take less than elegant approaches.

I’m fully aware that in many cases there may be better implementations out there, in some cases perhaps even off the shelf solutions that are production ready. That’s not what this project is after. I have many good friends in Microsoft’s engineering teams that I know will groan and sigh at the code in this project. I only ask that you be kind and keep in mind, this is an educational effort and essentially one big collection of code snippets.

So what’s in it currently?

What we’ve included in this first release is the common ask I referred to above. The need to take files generated on-premises and push them to Azure. Either letting them land in blob storage, or have them eventually land in an Azure hosted VM.

Here’s a diagram that outlines the scenario.

FileDepotDiagram

Step 1 – File placed in a folder on the network

The scenario begins with a file being created by some process and saved to the file system. This location could be a network file share or just as easily could be on the same server as the process itself.

Step 2 – The Location is monitored for new files

That location is in turn monitored by a “Publication Service”. Our reference implementation uses the c# FileSystemwatcher class which allows the application to be receive notification of file change events from Windows.

Step 3 – Publication service detects file and uploads to blob storage

When the creation of a new file raises an event in the application, the publishing app waits to get an exclusive lock on the file (making sure nothing is still writing to the file), then uploads it to a blob container.

Step 4 – Notification message with SAS for blob is published

After the blob is uploaded, the publication service then generates a shared access signature and publishes a message to a “Messages” Service Bus topic so that interested processes can be alerted that there’s a new file to be downloaded.

Step 5 – Subscribers receive message

Processes that want to subscribe to these notifications create subscriptions on that topic so they can receive the alerts.

Step 6 – Download blob and save to local disk

The subscribing process then use the shared access signature to then download the blob, placing it in the local file system.

Now this process could be used to push files from any location (cloud or on-premises) to any possible receiver. I like the example because it demonstrates a few key points of cloud architecture:

  • Use of messaging to create temporal decoupling and load leveling
  • Shared Access Signatures to grant temporary access to secure, private blob storage for potentially insecure/anonymous clients
  • Use of Service Bus Topics to implement pub/sub message model

So in this one example, we have a usable implementation pattern (I’ve provided all the basic helper classes as well as sample implementations of both console and Windows Services applications). We also have a few reusable code snippets (create a blob SAS, interact with Service Bus Topics, upload/download files to/from block blobs.

At least one customer I’m working with today will find these samples helpful. I hope others will as well.

What’s next?

I have a few things I plan to do this with this in the near term. Namely make the samples a bit more robust: error handling, logging, and maybe even *gasp* unit tests! I also want to add in support for larger files by showing how to implement this with page blobs (which are cheaper if you’re using less than 500TB of storage).

We may also explore using this to do not just new file publication, but perhaps updates as well as adding some useful metadata properties to the blobs and messages.

I also want to look at including more based scenarios. In fact, if you read this, and have a scenario, you can fork the project and send us a pull request. In fact, you’re wondering how to do something that you think could fit into this project, please drop me a line.

That’s all the time I have for today. Please look over the project as we’d love your feedback.

Service Bus and “pushing” notifications

I put quotes around the word ‘pushing’ in the title of this post because this isn’t a pure “push” scenario but more of a ‘solicited push’. Check out this blog post where Clemens Vasters discusses the differences and realize I’m more pragmatic then purist. :)

So the last several projects I’ve worked on, I’ve wanted to have a push notification system that I could use to send messages to role instances so that they could take actions. There’s several push notification systems out there, but I was after some simple that would be included as part of my Windows Azure services. I’ve put a version of this concept into several proposals, but this week finally received time to create a practical demo of the idea.

For this demo, I’ve selected to use Windows Azure Service Bus Topics. Topics, unlike Windows Azure Storage queues give me the capability to have multiple subscribers each receive a copy of a message. This was also an opportunity to dig into a feature of Windows Azure I haven’t worked with in over a year. Given how much the API has changed in that time, it was a frustrating, yet rewarding exercise.

The concept is fairly simple. Messages are sent to a centralized topic for distribution. Each role instance then creates its own subscriber with the appropriate filter on it so it receives the messages it cares about. This solution allows for multiple publishers and subscribers and will give me a decent amount of scale. I’ve heard reports/rumors of issues when you get beyond several hundred subscribers, but for this demo, we’ll be just fine.

Now for this demo implementation, I want to keep it simple. It should be a central class that can be used by workers or web roles to create their subscriptions and receive notifications with very little effort. And to keep this simplicity going, give me just as easy a way to send messages back out.

NotificationAgent

We’ll start by creating a class library for our centralized class, adding references to it for Microsoft.ServiceBus (so we can do our brokered messaging) and Microsoft.WindowsAzure.ServiceRuntime (for access to the role environment). I’m also going to create my NotificationTopic class.

Note: there are several supporting classes in the solution that I won’t cover in this article. If you want the full code for this solution, you can download it here.

The first method we’ll add to this is a constructor that takes the parameters we’ll need to connect to our service bus namespace as well as the name/path for the topic we’ll be using to broadcast notifications on. The first of these is creating a namespace manager so I can create topics and subscriptions and a messaging factory that I’ll use to receive messages. I’ve split this out a bit so that my class can support being passed a TokenProvider (I hate demo’s that only use the service owner). But here is the important lines:

TokenProvider tmpToken = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey);
Uri namespaceAddress = ServiceBusEnvironment.CreateServiceUri(“sb”, baseAddress, string.Empty);
this.namespaceManager = new NamespaceManager(namespaceAddress, tokenProvider);
this.messagingFactory = MessagingFactory.Create(namespaceAddress, tokenProvider);

We create a URI and a security token to use for interaction with our service bus namespace. For the sake of simplicity I’m using issuer name (owner) an the service administration key. I’d never recommend this for a production solution, but its fine for demonstration purposes. We use these to create a NamespaceManager and MessagingFactory.

Now we need to create the topic, if it doesn’t already exist.

try
{
// doesn’t always work, so wrap it
if (!namespaceManager.TopicExists(topicName))
this.namespaceManager.CreateTopic(topicName);
}
catch (MessagingEntityAlreadyExistsException)
{
// ignore, timing issues could cause this
}

Notice that I check to see if the topic exists, but I also trap for the exception. That’s because I don’t want to assume the operation is single threaded. With this block of code running in many role instances, its possible that between checking if it doesn’t exist and the create. So I like to wrap them in a try/catch. You can also just catch the exception, but I’ve long liked to avoid the overhead of unnecessary exceptions.

Finally, I’ll create a TopicClient that I’ll use to send messages to the topic.

So by creating an instance of this class, I can properly assume that the topic exists, and I have all the items I need to send or receive messages.

Sending Messages

Next up, I create a SendMessage method that accepts a string message payload, the type of message, and a TImeSpan value that indicates how long the message should live. In this method we first create a BrokeredMessage giving it an object that represents my notification message. We use the lifespan value that is passed in and set the type as a property. Finally, we send the message using the TopicClient we created earlier and do appropriate exception handling and cleanup.

try
{
bm = new BrokeredMessage(msg);
bm.TimeToLive = msgLifespan;
// used for filtering
bm.Properties[MESSAGEPROPERTY_TYPE] = messageType.ToString();
topicClient.Send(bm);
success = true;
}
catch (Exception)
{
success = false;
// TODO: do something
}
finally
{
if (bm != null) // if was created successfully
bm.Dispose();
}

Now the important piece here is the setting of a BrokeredMessage property. It’s this property that can be used later on to filter the messages we want to receive. So let’s not forget that. And you’ll also notice I have a TODO left to add some intelligent exception handling. Like logging the issue.

Start Receiving

This is when things get a little more complicated. Now the experts (meaning the folks I know/trust that responded to my inquiry), recommend that instead of going “old school” and having a thread that’s continually polling for responses, we instead leverage async processing. So we’re going to make use of delegates.

First we need to define a delegate for the callback method:

public delegate bool RecieverCallback(NotificationMessage mesage, NotificationMessageType type);

We then reference the new delegate in the method signature for the message receiving starter:

public void StartReceiving(RecieverCallback callback, NotificationMessageType msgType = NotificationMessageType.All)

More on this later….

Now inside this method we first need to create our subscriber. Since I want to have one subscriber for each role instance, I’ll need to get this from the Role Environment.

// need to parse out deployment ID
string instanceId = Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance.Id;
subscriptionName = instanceId.Substring(instanceId.IndexOf(‘.’)+1);SubscriptionDescription tmpSub = new SubscriptionDescription(topicName, subscriptionName);

Now is the point where we’ll add the in a filter using the Property that we set on the notification when we created it.

{
Filter tmpFilter = new SqlFilter(string.Format(“{0} = ‘{1}'”, MESSAGEPROPERTY_TYPE, msgType));
subscriptionClient.AddRule(SUBFILTER, tmpFilter);
}

I’m keeping it simple and using a SqlFilter using the property name we assigned when sending. So this subscription will only receive messages that match our filter criteria.

Now that all the setup is done, we’ll delete the subscription if it already exists (this gets rid of any messages and allows us to start clean) and create it new using the NameSpaceManager we instantiated in the class constructor. Then we start our async operation to retrieve messages:

asyncresult = subscriptionClient.BeginReceive(waittime, ReceiveDone, subscriptionClient);

Now in this, ReceiveDone is the callback method for the operation. This method is pretty straight forward. We make sure we’ve gotten a message (in case the operation simply timed out) and that we can get the payload. Then, using the delegate we set up earlier, And then we end by starting another async call to get another message.

if (result != null)
{
SubscriptionClient tmpClient = result.AsyncState as SubscriptionClient;    BrokeredMessage brokeredMessage = tmpClient.EndReceive(result);
//brokeredMessage.Complete(); // not really needed because your receive mode is ReceiveAndDeleteif (brokeredMessage != null)
{
NotificationMessage tmpMessage = brokeredMessage.GetBody<NotificationMessage>();

// do some type mapping here

recieverCallback(tmpMessage, tmpType);
}
}

// do recieve for next message
asyncresult = subscriptionClient.BeginReceive(ReceiveDone, subscriptionClient);

Now I’ve added two null checks in this method just to help out in case a receive operation fails. Even the, I won’t guarantee this works for all situations. In my tests, when I set the lifespan of a message to less than 5 seconds, still had some issues (sorting those out yet, but wanted to get this sample out).

Client side implementation

Whew! Lots of setup there. This is where our hard work pays off. We define a callback method we’re going to hand into our notification helper class using the delegate we defined. We’ll keep it super simple:

private bool NotificationRecieved(NotificationMessage message, NotificationMessageType type)
{
Console.WriteLine(“Recieved Notification”);    return true;
}

Now we need to instantiate our helper class and start the process of receiving messages. We can do this with a private variable to hold on our object and a couple lines into role’s OnStart.

tmpNotifier = new NotificationTopic(ServiceNamespace, IssuerName, IssuerKey, TopicName);
tmpNotifier.StartReceiving(new NotificationTopic.RecieverCallback(NotificationRecieved), NotificationMessageType.All);

Now if we want to clean things up, we can also add some code to the role’s OnStop.

try
{
if (tmpNotifier != null)
tmpNotifier.StopReceiving();
}
catch (Exception e)
{
Console.WriteLine(“Exception during OnStop: “ + e.ToString());
}base.OnStop();

And that’s all we need.

In Closing

So that’s it for our basic implementation. I’ve uploaded the demo for you to use at your own risk. You’ll need to update the WebRole, WorkerRole, and NotifierSample project with the information about your Service Bus namespace. To run the demo, you will want to set the cloud service project as the startup project, and launch it. Then right click on the NotifierSample project and start debugging on it as well.

While this demo may work fine for certain applications, there is definitely room for enhancement. We can tweak our message lifespan, wait timeouts, and even how many messages we retrieve at one time. And it’s also not the only way to accomplish this. But I think it’s a solid starting point if you need this kind of simple, self-contained notification service.

PS – As configured, this solution will require the ability to send outbound traffic on port 9354.

Azure Tools for Visual Studio 1.4 August Update–Year of Azure Week 5

Good evening folks. Its 8pm on Friday August 5th, 2011 (aka international beer day) as I write this. Last week’s update to my year of Azure series was weak, but this week’s will be even lighter. Just too much to do and not enough time I’m afraid.

As you can guess from the title of this update, I’d like to talk about the new 1.4 SDK update. Now I could go to great length about all the updates, but given that the Windows Azure team blog already did, and that Wade and Steve already covered it in this week’s cloud cover show. So instead, I’d like to focus on just one aspect of this update, the Azure Storage Analytics.

I can’t tell you all how thrilled I am. The best part of being a Microsoft MVP is all the great people you get to know. The second best part is getting to have an impact in the evolution of a product you’re passionate about. And while I hold no real illusion that anything I’ve said or done has led to the introduction of Azure Storage analytics, I can say its something I (and others) have specifically asked for.

I don’t have enough time this week to write up anything. Fortunately, Steve Marx has already put together the basics on how to interact with it. If that’s not enough, I recommend you go and check out the MSDN documentation on the new Storage Analytics API.

One thing I did run across while reading through the documentation tonight was that the special container that Analytics information gets written to, $Logs, has a 20TB limit. And that this limit is independent of he 100TB limit that is on rest of the storage account. This container is also subject to the being billed for data stored, and read/write actions. However, delete operations are a bit different. If you do it manually, its billable. But if its done as a result of the retention policies you set, its now.

So again, apologies for an extremely week update this week. But I’m going to try and ramp things up and take what Steve did and give you a nice code snippet that you can easily reuse. If possible, I’ll see if I can’t get that cranked out this weekend. Smile

Azure AppFabric Queues and Topics

The code used in this demo refers to a depricated version of the Topics API. Please refer to this post for an example that is compatible with the 1.6 or 1.7 Windows Azure SDK’s.

First some old business…. NEVER assume you know what a SequenceNumber property indicates. More on this later. Secondly, thanks to Clemens Vasters and his colleague Kartik. You are both gentlemen and scholars!

Ok… back to the topic at hand. A few weeks ago, I was making plans to be in Seattle and reached out to Wade Wegner (Azure Architect Evangelist) to see if he was going to be around so we could get together and talk shop. Well he lets me know that he’s taking some well deserved time off. Jokingly, I tell him I’d fill in for him on the cloud cover show. Two hours later I get an email in my inbox saying it was all set up and I need to pick a topic to demo and come up with news and the “tip of the week”.. that’ll teach me!

So here I was with just a couple weeks to prepare (and even less time as I had a vacation of my own already planned in the middle of all this). Wade and I both always had a soft spot for the oft maligned Azure AppFabric, so I figured I’d dive in and revisit an old topic, the re-introduction of AppFabric Queues and the new feature, Topics.

Service Bus? Azure Storage Queues? AppFabric Queues?

So the first question I ran into was trying to explain the differences between these three items. To try and be succinct about it… The Service Bus is good for low latency situations where you want dedicated connections or TCP/IP tunnels. Problem is that this doesn’t scale well, so we’ll need a disconnected messaging model and we have a simple, fairly lightweight model for this with Azure Storage Queues.

Now the new AppFabric Queues are more what I would classify as an enterprise level queue mechanism. Unlike Storage Queues, AppFabric Queues can be bound too with WCF as well as a RESTful API and .NET client library. There’s also a roadmap showing that we may get the old Routers back and some message transformation functionality. As if this wasn’t enough, AppFabric Queues are supposed to have real FIFO delivery (unlike Storage Queues “best attempt” FIFO) and full ACS integration.

Sounds like some strong differentiators to me.

So what about the cloud cover show?

So everything went ok, we recorded the show this morning. I was having some technical challenges with the demo I had wanted to do (look for my original goal in the next couple weeks). But I got a demo that worked (mostly), some good news items, and even a tip of the day. All in all things went well.

Anyways… It’s likely you’re here because you watched the show so here is a link to the code for the demo we showed on the show. Please take it and feel free to use it. Just keep in mind its only demo code and don’t just plug it onto a production solution.

Being on the show was a great experience and I’d love to be back again some day. The Channel 9 crew was great to work with and really made me feel at ease. Hopefully if you have seen the episode, you enjoyed it as well.

My tip of the day

So as I mentioned when kicking off this update, never assume you know what a SequenceNumber indicates. In preparing my demo, I burned A LOT of my time and bugged the heck out of Clemens and Kartrik. All this because I incorrectly assumed that the SequenceNumber property of the BrokeredMessages I was pulling from my subscriptions was based on the original topic. If I was dealing with a queue, this would have been the case. Instead, it is based on the subscription. This may not mean much at the moment, but I’m going to be put together a couple posts on Topics in the next couple weeks that will bring it into better context. So tune back in later when I build the demo I originally wanted to demo on Cloud Cover. Smile

PS – If you are ever asked to appear on an episode of the Cloud Cover show, it is recommended you not wear white or stripes. FYI…

Enter the Azure AppFabric Management Service

Before I dive into this update, I want to get a couple things out in the open. First, I’m an Azure AppFabric fan-boy. I see HUGE potential in this often overlooked silo of the Azure Platform. The PDC10 announcements re-enforced for me that Microsoft is really committed to making the Azure AppFabric the glue for helping enable and connect cloud solutions.

I’m betting many of you aren’t even aware of the existence of the Azure AppFabric Management Service. Up until PDC, there was little reason for anyone to look for it outside of those seeking a way to create new issuers that could connect to Azure Service Bus endpoints. These are usually the same people that noticed all the the Service Bus sample code uses the default “owner” that gets created when a new service namespace is created via the portal.

How the journey began

I’m preparing for an upcoming speaking engagement and wanted to do something more than just re-hash the same tired demos. I wanted to show how to setup new issuers so I asked on twitter one day about this and @rseroter responded that he had done it. He was also kind enough to quickly post a blog update with details. I sent him a couple follow-up questions and he pointed me to a bit of code I hadn’t noticed yet, the ACM Tool that comes as part of the Azure AppFabric SDK samples.

I spent a Saturday morning reverse engineering the ACM tool and using Fiddler to see what was going on. Finally, using a schema namespace I saw in Fiddler, I had a hit on an internet search and ran across the article "using the Azure AppFabric Management Service" on MSDN.

This was my Rosetta stone. It explained everything I was seeing with the ACM and also included some great tidbits about how authentication of requests works. It also put a post-PDC article from Will@MSFT into focus on how to manage the new Service Bus Connection Points. He was using the management service!

I could now begin to see how the Service Bus ecosystem was structured and the power that was just waiting here to be tapped into.

The Management Service

So, the Azure AppFabric Management API is a REST based API for managing your an AppFabric service namespace. When you go to the portal and create a new AppFabric service namepace, you’ll see a couple of lines that look like this:

image

Now if you’ve worked with the AppFabric before, you’re well aware of what the Registry URL is. But you likely haven’t worked much with the Management Endpoint and Management STS Endpoint. These are the endpoints that come into play with the AppFabric Management Service.

The STS Endpoint is pretty self-explanatory. It’s a proxy for Access Control for the management service. Any attempt to work with the management service will start with us giving an issuer name and key to this STS and getting at token back we can then pass along to the management service itself. There’s a good code snippet at the MSDN article, so I won’t dive into this much right now.

It’s the Management EndPoint itself that’s really my focus right now. This is the root namespace and there are several branches off of it that are each dedicated to a specific aspect of management:

Issuer – where our users (both simple users and x509 certs) are stored

Scope – the service namespace (URI) that issuers will be associated with

Token Policy – how long is a token good for, and signature key for ACS

Resources – new for connection point support

It’s the combination of these items that then controls which parties can connect to a service bus endpoint and what operations they can perform. It’s our ability to properly leverage this that will allow us to do useful real work things like setup sub-regions of the root namespace and assign specific rights for that sub-region to users. Maybe even do things like assign management at that level so various departments within your organization can each manage their own area of the service bus. J

In a nutshell, we can define an issuer, associate it with a scope (namespace path) which then also defines the rules for that issuer (Listen, Send, Manage). Using the management service, we can add/update/delete items from each of these areas (subject to restrictions).

How it works

Ok, this is the part where I’d normally post some really cool code snippets. Unfortunately, I spent most of a cold, icy Minnesota Sunday trying to get things working. ANYTHING working. Unfortunately I struck out.

But I’m not giving up yet. I batched up a few questions and sent them to some folks I’m hoping can find me answers. Meanwhile, I’m going to keep at it. There’s some significant stuff here and if there’s a mystery as big as what I’m doing wrong, it’s that I’m not entirely sure why we haven’t heard more about the Management Service yet.

So please stay tuned…

What’s next?

After a fairly unproductive weekend of playing with the Azure AppFabric Management Service, I have mixed emotions. I’m excited by the potential I see here, but at the same time, it still seems like there’s much work yet to be done. And who knows, perhaps this post and others I want to write may play a part in that work.

In the interim, you get this fairly short and theoretical update on the Azure AppFabric Management Service. But this won’t be the end of it. I’m still a huge Azure AppFabric fan-boy. I will let not let a single bad day beat me. I will get this figured out and bring it to the masses. I’m still working on my upcoming presentation and I’m confident my difficulties will be sorted out by then.

Follow

Get every new post delivered to your Inbox.

Join 1,220 other followers