November 18, 2014 6 Comments
Over the past few months, I had the good fortune to be accepted to present at ThatConference in Wisconsin and CloudDevelop in Ohio. I count myself even more fortunate because at the time I submitted my session for both these events, it was about a new Azure solution that hadn’t even been announced yet, the Event Hub.
Whenever possible, I like to put demos into a real world context. For this one, I reached out to two colleagues that were presenting at ThatConference and collectively we came up with the idea to do a conference attendee tracking solution. For my part of this series, I was going to cover using Event Hub to ingest event messages from various sources (social media, mobile apps, and proximity sensors) and feeding those into the hub. I also wrote some code so that the other sessions could consume the messages.
Event Hub vs. Topics/Queues
The first question to get out of the way is that Event Hub is NOT just a new variation on Topics/Queues. For this, I’ve found a simple visual example works best.
This is topics/queues
This is Event Hub
The key differentiator between the two is scale. A courier can pick up a selection of packages, and ensure they are delivered. But if you need to move hundreds of thousands of packages, you can do that with a lot of couriers, or you could build a distribution center capable of handling that kind of volume more quickly. Event Hub is that distribution center. But it’s been built as a managed service so you don’t have to build your own expensive facility. You can just leverage the one we’ve built for you.
In Service Bus, topics and queues are about the transportation and delivery of a specific payload (the brokered message) from point A to point B. These come with specific features (guaranteed delivery, visibility controls, etc…) that also limit the scale at which a single queue can operate. Service Bus was built to solve the challenges of scaled ingestion of messages, but did so with the trade-off of these types of atomic operations. The easiest way to think of Event Hub is as a giant buffer into which you place messages, and they are automatically retained for a given period of time. You then have the ability to read those messages much as you would read a file stream from disk. You can even rewind all the way back to the beginning of the stream and process everything again.
And as you might expect given the different focus of the two solutions, the programming models are also different. So it’s also important to understand that switching from one to the other isn’t simply a matter of switching the SDK.
What is the Event Hub?
If you think back to Topics/Queues, you had the option of enabling partitions via the EnablePartioning property. This would cause the topic or queue to switch from a single event hub broker (the service side edge compute node), to 16 brokers, increasing the total throughput of the queue by 16 times. We call this approach, partitioning. And this is exactly what Event Hub does.
When you create an Event Hub, you determine the number of partitions that you want (from 16, the default, up to 1024). This allows you to scale out the processes that need to consume events. Partitions are also used to help direct messages. When you send an event to the hub, you can assign a partition key which is in turn hashed by the Event hub brokers so that it lands in a given partition. This hash ensures that as long as the same partition key is used, the events will be placed into the same partition and in turn will be picked up by the same reciever. If you fail to specify a partition, the events will be distributed randomly.
When it comes to throughput, this isn’t the end of the story. Event Hubs also have “throughput units”. By default you start with a single throughput unit that allows 1mb/s in and 2mb/s out through your hubs. You can request this to get scaled up to 1000 throughput units. When you purchase a throughput unit, you do this at the namespace level since it applies to all your event hubs in that namespace.
So what we have is a service that can scale to handle massive ingestion of events, combined with a huge buffer just in case the back end, which also features scalable consumption, can’t keep up with the rate in which messages are being sent. This gives us scale on multiple facets, as a managed, hosted service.
So about that presentation…
So the next obvious question is, “how does it work?” This is where my demos came in. I wanted to show using event hug to consume events from multiple sources: a social media feed, a mobile app used by vendors to scan attendee badges, and proximity sensors scattered around the conference to help measure session attendance.
I started by realizing that when I consume event, I needed to know what type they were (aka how to deserialize them). To make this easy, I started by defining my own customer, .NET message types. I selected twitter for the media feed and for the messages, the type class declaration looks like this:
So we have who tweeted, the text of the message, and when it was created. I decorated the class with various data attributes to aid in serialization.
When a tweet is found, we’ll need a client to send the event…
This creates an EventHubClient object, using a connection string from the configuration file, and a configuration setting that defines the name of the hub I want to send to.
Next up, we need to create my event object, and serialize it.
I opted to use Newtonsoft’s .NET JSON serializer. It was already brought in by the Event Hub nuget package. JSON is lighter weight then XML, and since Event Hub is based on total throughput, not # of messages, it made sense to keep the payloads as small as was convenient.
Finally, I have to actually send the message:
Now you’ll recall I explained that the partition key is used to help distribute the events so that we end up with a fairly even distribution amoung the consuming processes. So why would I select to bind each of my examples to a single partition? In my case, I knew that volumes would be low, so there wasn’t much of an issue with overloading my consuming processes. But you can also use this approach if you want to ensure that the same consuming process always gets the events from the same source. Something that can be really handy if the consuming process is using the events to maintain an in-memory state model of some type.
So what about consuming the events?
Events are consumed via “consumer groups”. Each group can track its position within the overall event hub ‘stream’ separately, allowing for parallel consumption of events. A default group is created when the event hub is created, but we can create our own. Consuming processes in turn create receivers, which connect to the various partitions to actually consume the events. This would normally require you to code up some rather complicated logic to ensure that if the process that owns a given set of receivers becomes unavailable, another process can pick up the slack. Fortunately, the event hub team thought of this already and created another nuget package called the EventProcessorHost.
Simply put, this is a handy, .NET based approach to handle resiliency and fault tolerance of event consumers/recievers. It uses Azure Storage blobs to track which receivers are attached to a given partition in an event hub. If you add or remove consuming processes, it will redistribute the receivers accordingly. I used this approach for my presentation to create a simple console app that displays the events coming into the hub. There’s really just three parts of the solution: the program itself, a receiver class, and an event processor class.
The console program is the simplest bit of code…
We use the namespace manager to create a consumer group if the one we want doesn’t already exist. Then we instantiate a Receiver object, and tell that object to start processing events, distributing threads across the various partitions in the event hub. The nuget package comes with its own Receiver class, so there’s not much you really need to do. The core of the receiver is in the MessageProcessingWithPartitionDistribution method.
You’ll note that this may actually be a bit different then the version that arrives with the nuget package. This is because I’ve modified it to use a consumer group name I specify, instead of just the default name. Otherwise, it’s the same example. I get the Azure Storage connection string (where the blobs that will control our leases will go), and then uses that to create an EventProcessorHost object. We then tell the host to start doing asynchronous event processing (via RegisterEventProcessorAsync). This registration, actually points to our third class, which implements the IEventProcessor interface. Again a template is provided as part of the nuget package, so we don’t have to write the entire thing ourselves. But if you look at this ProcessEventsAsync method, we see the heart of it…
What’s happening behinds the scenes is that a thread is being spun up for each partition on the Event Hub. This thread then uses a blob lease to take ownership of a partition, then attached a receiver and begins consuming the events. Each time it pulls events (by default, it will pull 10 at a time), the method I show above is called. This method just loops through the collection of events, and every minute will tell the EventProcessorHost to checkpoint (record were we’re at in the stream) our progress. Inside of the foreach loop is the code that looks at the events, deserializes appropriately, and displays then on the programs console.
You can see we’re checking the events “Type” property, and then deseralizing it back into an object with the proper type of encoding. It’s a simple example, but I think drives the point home.
We can see some of what’s going on under the covers of the processor by looking at the blob storage account we’ve associated with our processor. First up, the EventProcessor creates a container in the storage account that is named the same as our event hub (so if you have multiple hubs with the same name in different namespaces, be sure to use different storage accounts). Within that container is a blob named “evenhub.info” which contains a JSON payload that describes the container and the hub.
This shows the location of the hub, when this container/file was created, and the number of partitions in the hub. Getting the number of partitions is why you must use a connection string or SAS for the hub that has manager permissions. Also within this container is one blob (zero indexed) for each partition in the hub. These blobs also contain JSON payloads.
We have the partition this file is for, the owner (aka the EventProcessorHost name we gave this), A token (presumably for the lease), an Epoch (not sure what this is for YET), and an Offset. This last value is the position we’re at in the stream. When you call the CheckPointAsync method of our SimpleEventProcessor, this will update the value of the offset so we don’t read old message again.
Now if we spin up two copies of this application, after a minute or so, you’d see the two change ownership of various partitions. Messages start appearing in both and providing you’re spreading your messages over enough partitions, you’ll even be able to see the partition keys at work as different clients will get messages from specific partitions.
Ok, but what about the presentations?
Now when I started this post, I mentioned that there was a presentation and set of demos to go along with this. I’ve upload both for you to take away and use as you see fit. So enjoy!
Until next time!